commonware_consensus/threshold_simplex/
mod.rs

1//! [Simplex](crate::simplex)-like BFT agreement with an embedded VRF and succinct consensus certificates.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `threshold-simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal finalization
5//! latency in a partially synchronous setting. Unlike Simplex Consensus, however, `threshold-simplex` employs threshold
6//! cryptography (specifically BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum) to generate both
7//! a bias-resistant beacon (for leader election and post-facto execution randomness) and succinct consensus certificates
8//! (any certificate can be verified with just the static public key of the consensus instance) for each view
9//! with zero message overhead (natively integrated).
10//!
11//! _If you wish to deploy Simplex Consensus but can't employ threshold signatures, see
12//! [crate::simplex]._
13//!
14//! # Features
15//!
16//! * Wicked Fast Block Times (2 Network Hops)
17//! * Optimal Finalization Latency (3 Network Hops)
18//! * Externalized Uptime and Fault Proofs
19//! * Decoupled Block Broadcast and Sync
20//! * Lazy Message Verification
21//! * Flexible Block Format
22//! * Embedded VRF for Leader Election and Post-Facto Execution Randomness
23//! * Succinct Consensus Certificates for Notarization, Nullification, and Finality
24//!
25//! # Design
26//!
27//! ## Architecture
28//!
29//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
30//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
31//! is responsible for directing participation in the current view. Lastly, the `Resolver` is responsible for
32//! fetching artifacts from previous views required to verify proposed blocks in the latest view.
33//!
34//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
35//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
36//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
37//!
38//! ```txt
39//!                            +------------+          +++++++++++++++
40//!                            |            +--------->+             +
41//!                            |  Batcher   |          +    Peers    +
42//!                            |            |<---------+             +
43//!                            +-------+----+          +++++++++++++++
44//!                                |   ^
45//!                                |   |
46//!                                |   |
47//!                                |   |
48//!                                v   |
49//! +---------------+           +---------+            +++++++++++++++
50//! |               |<----------+         +----------->+             +
51//! |  Application  |           |  Voter  |            +    Peers    +
52//! |               +---------->|         |<-----------+             +
53//! +---------------+           +--+------+            +++++++++++++++
54//!                                |   ^
55//!                                |   |
56//!                                |   |
57//!                                |   |
58//!                                v   |
59//!                            +-------+----+          +++++++++++++++
60//!                            |            +--------->+             +
61//!                            |  Resolver  |          +    Peers    +
62//!                            |            |<---------+             +
63//!                            +------------+          +++++++++++++++
64//! ```
65//!
66//! ## Joining Consensus
67//!
68//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
69//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
70//! latest view and begin participating in consensus (assuming it can verify blocks).
71//!
72//! ## Persistence
73//!
74//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
75//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
76//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
77//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
78//! on restart (especially in the case of unclean shutdown).
79//!
80//! ## Batched Verification
81//!
82//! Unlike other consensus constructions that verify all incoming messages received from peers,
83//! `threshold-simplex` lazily verifies messages (only when a quorum is met). If an invalid signature
84//! is detected, the `Batcher` will perform repeated bisections over collected messages to find the
85//! offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
86//!
87//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
88//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
89//!
90//! ## Protocol Description
91//!
92//! ### Specification for View `v`
93//!
94//! Upon entering view `v`:
95//! * Determine leader `l` for view `v`
96//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
97//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
98//! * If leader `l`, broadcast `(part(v), notarize(c,v))`
99//!   * If can't propose container in view `v` because missing notarization/nullification for a
100//!     previous view `v_m`, request `v_m`
101//!
102//! Upon receiving first `(part(v), notarize(c,v))` from `l`:
103//! * Cancel `t_l`
104//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
105//!   between `v` and `v_parent`, verify `c` and broadcast `(part(v), notarize(c,v))`
106//!
107//! Upon receiving `2f+1` `(part(v), notarize(c,v))`:
108//! * Cancel `t_a`
109//! * Mark `c` as notarized
110//! * Broadcast `(seed(v), notarization(c,v))` (even if we have not verified `c`)
111//! * If have not broadcast `(part(v), nullify(v))`, broadcast `finalize(c,v)`
112//! * Enter `v+1`
113//!
114//! Upon receiving `2f+1` `(part(v), nullify(v))`:
115//! * Broadcast `(seed(v), nullification(v))`
116//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
117//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
118//!       instead.
119//! * Enter `v+1`
120//!
121//! Upon receiving `2f+1` `finalize(c,v)`:
122//! * Mark `c` as finalized (and recursively finalize its parents)
123//! * Broadcast `(seed(v), finalization(c,v))` (even if we have not verified `c`)
124//!
125//! Upon `t_l` or `t_a` firing:
126//! * Broadcast `(part(v), nullify(v))`
127//! * Every `t_r` after `(part(v), nullify(v))` broadcast that we are still in view `v`:
128//!    * Rebroadcast `(part(v), nullify(v))` and either `(seed(v-1), notarization(v-1))` or `(seed(v-1), nullification(v-1))`
129//!
130//! #### Embedded VRF
131//!
132//! When broadcasting any `notarize(c,v)` or `nullify(v)` message, a participant must also include a `part(v)` message (a partial
133//! signature over the view `v`). After `2f+1` `notarize(c,v)` or `nullify(v)` messages are collected from unique participants,
134//! `seed(v)` can be recovered. Because `part(v)` is only over the view `v`, the seed derived for a given view `v` is the same regardless of
135//! whether or not a block was notarized in said view `v`.
136//!
137//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
138//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
139//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
140//! is used as a seed in `v`).
141//!
142//! #### Succinct Consensus Certificates
143//!
144//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain partial signatures for a static
145//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
146//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
147//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
148//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
149//! to operate a lite client).
150//!
151//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
152//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
153//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
154//!
155//! ### Deviations from Simplex Consensus
156//!
157//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
158//!   a set of all notarizations/nullifications for all historical blocks.
159//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
160//!   either a "block" or a "dummy block", respectively.
161//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
162//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
163//!   some number of views (again to trigger early view transition for an unresponsive leader).
164//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
165//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
166
167use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172    if #[cfg(not(target_arch = "wasm32"))] {
173        mod actors;
174        mod config;
175        pub use config::Config;
176        mod engine;
177        pub use engine::Engine;
178        mod metrics;
179    }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185/// The minimum view we are tracking both in-memory and on-disk.
186pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187    last_finalized.saturating_sub(activity_timeout)
188}
189
190/// Whether or not a view is interesting to us. This is a function
191/// of both `min_active` and whether or not the view is too far
192/// in the future (based on the view we are currently in).
193pub(crate) fn interesting(
194    activity_timeout: View,
195    last_finalized: View,
196    current: View,
197    pending: View,
198    allow_future: bool,
199) -> bool {
200    if pending < min_active(activity_timeout, last_finalized) {
201        return false;
202    }
203    if !allow_future && pending > current + 1 {
204        return false;
205    }
206    true
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::{threshold_simplex::types::seed_namespace, Monitor};
213    use commonware_cryptography::{
214        bls12381::{
215            dkg::ops,
216            primitives::{
217                poly::public,
218                variant::{MinPk, MinSig, Variant},
219            },
220            tle::{decrypt, encrypt, Block},
221        },
222        ed25519::PrivateKey,
223        PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224    };
225    use commonware_macros::{select, test_traced};
226    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
228    use commonware_utils::{quorum, NZUsize, NZU32};
229    use engine::Engine;
230    use futures::{future::join_all, StreamExt};
231    use governor::Quota;
232    use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233    use std::{
234        collections::{BTreeMap, HashMap},
235        num::NonZeroUsize,
236        sync::{Arc, Mutex},
237        time::Duration,
238    };
239    use tracing::{debug, warn};
240    use types::Activity;
241
242    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
243    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
244
245    /// Registers all validators using the oracle.
246    async fn register_validators<P: PublicKey>(
247        oracle: &mut Oracle<P>,
248        validators: &[P],
249    ) -> HashMap<
250        P,
251        (
252            (Sender<P>, Receiver<P>),
253            (Sender<P>, Receiver<P>),
254            (Sender<P>, Receiver<P>),
255        ),
256    > {
257        let mut registrations = HashMap::new();
258        for validator in validators.iter() {
259            let (pending_sender, pending_receiver) =
260                oracle.register(validator.clone(), 0).await.unwrap();
261            let (recovered_sender, recovered_receiver) =
262                oracle.register(validator.clone(), 1).await.unwrap();
263            let (resolver_sender, resolver_receiver) =
264                oracle.register(validator.clone(), 2).await.unwrap();
265            registrations.insert(
266                validator.clone(),
267                (
268                    (pending_sender, pending_receiver),
269                    (recovered_sender, recovered_receiver),
270                    (resolver_sender, resolver_receiver),
271                ),
272            );
273        }
274        registrations
275    }
276
277    /// Enum to describe the action to take when linking validators.
278    enum Action {
279        Link(Link),
280        Update(Link), // Unlink and then link
281        Unlink,
282    }
283
284    /// Links (or unlinks) validators using the oracle.
285    ///
286    /// The `action` parameter determines the action (e.g. link, unlink) to take.
287    /// The `restrict_to` function can be used to restrict the linking to certain connections,
288    /// otherwise all validators will be linked to all other validators.
289    async fn link_validators<P: PublicKey>(
290        oracle: &mut Oracle<P>,
291        validators: &[P],
292        action: Action,
293        restrict_to: Option<fn(usize, usize, usize) -> bool>,
294    ) {
295        for (i1, v1) in validators.iter().enumerate() {
296            for (i2, v2) in validators.iter().enumerate() {
297                // Ignore self
298                if v2 == v1 {
299                    continue;
300                }
301
302                // Restrict to certain connections
303                if let Some(f) = restrict_to {
304                    if !f(validators.len(), i1, i2) {
305                        continue;
306                    }
307                }
308
309                // Do any unlinking first
310                match action {
311                    Action::Update(_) | Action::Unlink => {
312                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
313                    }
314                    _ => {}
315                }
316
317                // Do any linking after
318                match action {
319                    Action::Link(ref link) | Action::Update(ref link) => {
320                        oracle
321                            .add_link(v1.clone(), v2.clone(), link.clone())
322                            .await
323                            .unwrap();
324                    }
325                    _ => {}
326                }
327            }
328        }
329    }
330
331    fn all_online<V: Variant>() {
332        // Create context
333        let n = 5;
334        let threshold = quorum(n);
335        let required_containers = 100;
336        let activity_timeout = 10;
337        let skip_timeout = 5;
338        let namespace = b"consensus".to_vec();
339        let executor = deterministic::Runner::timed(Duration::from_secs(30));
340        executor.start(|mut context| async move {
341            // Create simulated network
342            let (network, mut oracle) = Network::new(
343                context.with_label("network"),
344                Config {
345                    max_size: 1024 * 1024,
346                },
347            );
348
349            // Start network
350            network.start();
351
352            // Register participants
353            let mut schemes = Vec::new();
354            let mut validators = Vec::new();
355            for i in 0..n {
356                let scheme = PrivateKey::from_seed(i as u64);
357                let pk = scheme.public_key();
358                schemes.push(scheme);
359                validators.push(pk);
360            }
361            validators.sort();
362            schemes.sort_by_key(|s| s.public_key());
363            let mut registrations = register_validators(&mut oracle, &validators).await;
364
365            // Link all validators
366            let link = Link {
367                latency: 10.0,
368                jitter: 1.0,
369                success_rate: 1.0,
370            };
371            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
372
373            // Derive threshold
374            let (polynomial, shares) =
375                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
376
377            // Create engines
378            let relay = Arc::new(mocks::relay::Relay::new());
379            let mut supervisors = Vec::new();
380            let mut engine_handlers = Vec::new();
381            for (idx, scheme) in schemes.into_iter().enumerate() {
382                // Create scheme context
383                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
384
385                // Configure engine
386                let validator = scheme.public_key();
387                let mut participants = BTreeMap::new();
388                participants.insert(
389                    0,
390                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
391                );
392                let supervisor_config = mocks::supervisor::Config::<_, V> {
393                    namespace: namespace.clone(),
394                    participants,
395                };
396                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
397                supervisors.push(supervisor.clone());
398                let application_cfg = mocks::application::Config {
399                    hasher: Sha256::default(),
400                    relay: relay.clone(),
401                    participant: validator.clone(),
402                    propose_latency: (10.0, 5.0),
403                    verify_latency: (10.0, 5.0),
404                };
405                let (actor, application) = mocks::application::Application::new(
406                    context.with_label("application"),
407                    application_cfg,
408                );
409                actor.start();
410                let blocker = oracle.control(scheme.public_key());
411                let cfg = config::Config {
412                    crypto: scheme,
413                    blocker,
414                    automaton: application.clone(),
415                    relay: application.clone(),
416                    reporter: supervisor.clone(),
417                    supervisor,
418                    partition: validator.to_string(),
419                    compression: Some(3),
420                    mailbox_size: 1024,
421                    namespace: namespace.clone(),
422                    leader_timeout: Duration::from_secs(1),
423                    notarization_timeout: Duration::from_secs(2),
424                    nullify_retry: Duration::from_secs(10),
425                    fetch_timeout: Duration::from_secs(1),
426                    activity_timeout,
427                    skip_timeout,
428                    max_fetch_count: 1,
429                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
430                    fetch_concurrent: 1,
431                    replay_buffer: NZUsize!(1024 * 1024),
432                    write_buffer: NZUsize!(1024 * 1024),
433                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
434                };
435                let engine = Engine::new(context.with_label("engine"), cfg);
436
437                // Start engine
438                let (pending, recovered, resolver) = registrations
439                    .remove(&validator)
440                    .expect("validator should be registered");
441                engine_handlers.push(engine.start(pending, recovered, resolver));
442            }
443
444            // Wait for all engines to finish
445            let mut finalizers = Vec::new();
446            for supervisor in supervisors.iter_mut() {
447                let (mut latest, mut monitor) = supervisor.subscribe().await;
448                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
449                    while latest < required_containers {
450                        latest = monitor.next().await.expect("event missing");
451                    }
452                }));
453            }
454            join_all(finalizers).await;
455
456            // Check supervisors for correct activity
457            let latest_complete = required_containers - activity_timeout;
458            for supervisor in supervisors.iter() {
459                // Ensure no faults
460                {
461                    let faults = supervisor.faults.lock().unwrap();
462                    assert!(faults.is_empty());
463                }
464
465                // Ensure no invalid signatures
466                {
467                    let invalid = supervisor.invalid.lock().unwrap();
468                    assert_eq!(*invalid, 0);
469                }
470
471                // Ensure seeds for all views
472                {
473                    let seeds = supervisor.seeds.lock().unwrap();
474                    for view in 1..latest_complete {
475                        // Ensure seed for every view
476                        if !seeds.contains_key(&view) {
477                            panic!("view: {view}");
478                        }
479                    }
480                }
481
482                // Ensure no forks
483                let mut notarized = HashMap::new();
484                let mut finalized = HashMap::new();
485                {
486                    let notarizes = supervisor.notarizes.lock().unwrap();
487                    for view in 1..latest_complete {
488                        // Ensure only one payload proposed per view
489                        let Some(payloads) = notarizes.get(&view) else {
490                            continue;
491                        };
492                        if payloads.len() > 1 {
493                            panic!("view: {view}");
494                        }
495                        let (digest, notarizers) = payloads.iter().next().unwrap();
496                        notarized.insert(view, *digest);
497
498                        if notarizers.len() < threshold as usize {
499                            // We can't verify that everyone participated at every view because some nodes may
500                            // have started later.
501                            panic!("view: {view}");
502                        }
503                    }
504                }
505                {
506                    let notarizations = supervisor.notarizations.lock().unwrap();
507                    for view in 1..latest_complete {
508                        // Ensure notarization matches digest from notarizes
509                        let Some(notarization) = notarizations.get(&view) else {
510                            continue;
511                        };
512                        let Some(digest) = notarized.get(&view) else {
513                            continue;
514                        };
515                        assert_eq!(&notarization.proposal.payload, digest);
516                    }
517                }
518                {
519                    let finalizes = supervisor.finalizes.lock().unwrap();
520                    for view in 1..latest_complete {
521                        // Ensure only one payload proposed per view
522                        let Some(payloads) = finalizes.get(&view) else {
523                            continue;
524                        };
525                        if payloads.len() > 1 {
526                            panic!("view: {view}");
527                        }
528                        let (digest, finalizers) = payloads.iter().next().unwrap();
529                        finalized.insert(view, *digest);
530
531                        // Only check at views below timeout
532                        if view > latest_complete {
533                            continue;
534                        }
535
536                        // Ensure everyone participating
537                        if finalizers.len() < threshold as usize {
538                            // We can't verify that everyone participated at every view because some nodes may
539                            // have started later.
540                            panic!("view: {view}");
541                        }
542
543                        // Ensure no nullifies for any finalizers
544                        let nullifies = supervisor.nullifies.lock().unwrap();
545                        let Some(nullifies) = nullifies.get(&view) else {
546                            continue;
547                        };
548                        for (_, finalizers) in payloads.iter() {
549                            for finalizer in finalizers.iter() {
550                                if nullifies.contains(finalizer) {
551                                    panic!("should not nullify and finalize at same view");
552                                }
553                            }
554                        }
555                    }
556                }
557                {
558                    let finalizations = supervisor.finalizations.lock().unwrap();
559                    for view in 1..latest_complete {
560                        // Ensure finalization matches digest from finalizes
561                        let Some(finalization) = finalizations.get(&view) else {
562                            continue;
563                        };
564                        let Some(digest) = finalized.get(&view) else {
565                            continue;
566                        };
567                        assert_eq!(&finalization.proposal.payload, digest);
568                    }
569                }
570            }
571
572            // Ensure no blocked connections
573            let blocked = oracle.blocked().await.unwrap();
574            assert!(blocked.is_empty());
575        });
576    }
577
578    #[test_traced]
579    fn test_all_online() {
580        all_online::<MinPk>();
581        all_online::<MinSig>();
582    }
583
584    fn unclean_shutdown<V: Variant>() {
585        // Create context
586        let n = 5;
587        let threshold = quorum(n);
588        let required_containers = 100;
589        let activity_timeout = 10;
590        let skip_timeout = 5;
591        let namespace = b"consensus".to_vec();
592
593        // Derive threshold
594        let mut rng = StdRng::seed_from_u64(0);
595        let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
596
597        // Random restarts every x seconds
598        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
599        let supervised = Arc::new(Mutex::new(Vec::new()));
600        let mut prev_ctx = None;
601
602        loop {
603            let namespace = namespace.clone();
604            let shutdowns = shutdowns.clone();
605            let supervised = supervised.clone();
606            let polynomial = polynomial.clone();
607            let shares = shares.clone();
608
609            let f = |mut context: deterministic::Context| async move {
610                // Create simulated network
611                let (network, mut oracle) = Network::new(
612                    context.with_label("network"),
613                    Config {
614                        max_size: 1024 * 1024,
615                    },
616                );
617
618                // Start network
619                network.start();
620
621                // Register participants
622                let mut schemes = Vec::new();
623                let mut validators = Vec::new();
624                for i in 0..n {
625                    let scheme = PrivateKey::from_seed(i as u64);
626                    let pk = scheme.public_key();
627                    schemes.push(scheme);
628                    validators.push(pk);
629                }
630                validators.sort();
631                schemes.sort_by_key(|s| s.public_key());
632                let mut registrations = register_validators(&mut oracle, &validators).await;
633
634                // Link all validators
635                let link = Link {
636                    latency: 50.0,
637                    jitter: 50.0,
638                    success_rate: 1.0,
639                };
640                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
641
642                // Create engines
643                let relay = Arc::new(mocks::relay::Relay::new());
644                let mut supervisors = HashMap::new();
645                let mut engine_handlers = Vec::new();
646                for (idx, scheme) in schemes.into_iter().enumerate() {
647                    // Create scheme context
648                    let context = context
649                        .clone()
650                        .with_label(&format!("validator-{}", scheme.public_key()));
651
652                    // Configure engine
653                    let validator = scheme.public_key();
654                    let mut participants = BTreeMap::new();
655                    participants.insert(
656                        0,
657                        (polynomial.clone(), validators.clone(), shares[idx].clone()),
658                    );
659                    let supervisor_config = mocks::supervisor::Config::<_, V> {
660                        namespace: namespace.clone(),
661                        participants,
662                    };
663                    let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
664                    supervisors.insert(validator.clone(), supervisor.clone());
665                    let application_cfg = mocks::application::Config {
666                        hasher: Sha256::default(),
667                        relay: relay.clone(),
668                        participant: validator.clone(),
669                        propose_latency: (10.0, 5.0),
670                        verify_latency: (10.0, 5.0),
671                    };
672                    let (actor, application) = mocks::application::Application::new(
673                        context.with_label("application"),
674                        application_cfg,
675                    );
676                    actor.start();
677                    let blocker = oracle.control(scheme.public_key());
678                    let cfg = config::Config {
679                        crypto: scheme,
680                        blocker,
681                        automaton: application.clone(),
682                        relay: application.clone(),
683                        reporter: supervisor.clone(),
684                        supervisor,
685                        partition: validator.to_string(),
686                        compression: Some(3),
687                        mailbox_size: 1024,
688                        namespace: namespace.clone(),
689                        leader_timeout: Duration::from_secs(1),
690                        notarization_timeout: Duration::from_secs(2),
691                        nullify_retry: Duration::from_secs(10),
692                        fetch_timeout: Duration::from_secs(1),
693                        activity_timeout,
694                        skip_timeout,
695                        max_fetch_count: 1,
696                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
697                        fetch_concurrent: 1,
698                        replay_buffer: NZUsize!(1024 * 1024),
699                        write_buffer: NZUsize!(1024 * 1024),
700                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
701                    };
702                    let engine = Engine::new(context.with_label("engine"), cfg);
703
704                    // Start engine
705                    let (pending, recovered, resolver) = registrations
706                        .remove(&validator)
707                        .expect("validator should be registered");
708                    engine_handlers.push(engine.start(pending, recovered, resolver));
709                }
710
711                // Store all finalizer handles
712                let mut finalizers = Vec::new();
713                for (_, supervisor) in supervisors.iter_mut() {
714                    let (mut latest, mut monitor) = supervisor.subscribe().await;
715                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
716                        while latest < required_containers {
717                            latest = monitor.next().await.expect("event missing");
718                        }
719                    }));
720                }
721
722                // Exit at random points for unclean shutdown of entire set
723                let wait =
724                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
725                let result = select! {
726                    _ = context.sleep(wait) => {
727                        // Collect supervisors to check faults
728                        {
729                            let mut shutdowns = shutdowns.lock().unwrap();
730                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
731                            *shutdowns += 1;
732                        }
733                        supervised.lock().unwrap().push(supervisors);
734                        (false,context)
735                    },
736                    _ = join_all(finalizers) => {
737                        // Check supervisors for faults activity
738                        let supervised = supervised.lock().unwrap();
739                        for supervisors in supervised.iter() {
740                            for (_, supervisor) in supervisors.iter() {
741                                let faults = supervisor.faults.lock().unwrap();
742                                assert!(faults.is_empty());
743                            }
744                        }
745                        (true,context)
746                    }
747                };
748
749                // Ensure no blocked connections
750                let blocked = oracle.blocked().await.unwrap();
751                assert!(blocked.is_empty());
752
753                result
754            };
755
756            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
757                deterministic::Runner::from(prev_ctx)
758            } else {
759                deterministic::Runner::timed(Duration::from_secs(30))
760            }
761            .start(f);
762
763            // Check if we should exit
764            if complete {
765                break;
766            }
767
768            prev_ctx = Some(context.recover());
769        }
770    }
771
772    #[test_traced]
773    fn test_unclean_shutdown() {
774        unclean_shutdown::<MinPk>();
775        unclean_shutdown::<MinSig>();
776    }
777
778    fn backfill<V: Variant>() {
779        // Create context
780        let n = 4;
781        let threshold = quorum(n);
782        let required_containers = 100;
783        let activity_timeout = 10;
784        let skip_timeout = 5;
785        let namespace = b"consensus".to_vec();
786        let executor = deterministic::Runner::timed(Duration::from_secs(720));
787        executor.start(|mut context| async move {
788            // Create simulated network
789            let (network, mut oracle) = Network::new(
790                context.with_label("network"),
791                Config {
792                    max_size: 1024 * 1024,
793                },
794            );
795
796            // Start network
797            network.start();
798
799            // Register participants
800            let mut schemes = Vec::new();
801            let mut validators = Vec::new();
802            for i in 0..n {
803                let scheme = PrivateKey::from_seed(i as u64);
804                let pk = scheme.public_key();
805                schemes.push(scheme);
806                validators.push(pk);
807            }
808            validators.sort();
809            schemes.sort_by_key(|s| s.public_key());
810            let mut registrations = register_validators(&mut oracle, &validators).await;
811
812            // Link all validators except first
813            let link = Link {
814                latency: 10.0,
815                jitter: 1.0,
816                success_rate: 1.0,
817            };
818            link_validators(
819                &mut oracle,
820                &validators,
821                Action::Link(link),
822                Some(|_, i, j| ![i, j].contains(&0usize)),
823            )
824            .await;
825
826            // Derive threshold
827            let (polynomial, shares) =
828                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
829
830            // Create engines
831            let relay = Arc::new(mocks::relay::Relay::new());
832            let mut supervisors = Vec::new();
833            let mut engine_handlers = Vec::new();
834            for (idx_scheme, scheme) in schemes.iter().enumerate() {
835                // Skip first peer
836                if idx_scheme == 0 {
837                    continue;
838                }
839
840                // Create scheme context
841                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
842
843                // Configure engine
844                let validator = scheme.public_key();
845                let mut participants = BTreeMap::new();
846                participants.insert(
847                    0,
848                    (
849                        polynomial.clone(),
850                        validators.clone(),
851                        shares[idx_scheme].clone(),
852                    ),
853                );
854                let supervisor_config = mocks::supervisor::Config {
855                    namespace: namespace.clone(),
856                    participants,
857                };
858                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
859                supervisors.push(supervisor.clone());
860                let application_cfg = mocks::application::Config {
861                    hasher: Sha256::default(),
862                    relay: relay.clone(),
863                    participant: validator.clone(),
864                    propose_latency: (10.0, 5.0),
865                    verify_latency: (10.0, 5.0),
866                };
867                let (actor, application) = mocks::application::Application::new(
868                    context.with_label("application"),
869                    application_cfg,
870                );
871                actor.start();
872                let blocker = oracle.control(scheme.public_key());
873                let cfg = config::Config {
874                    crypto: scheme.clone(),
875                    blocker,
876                    automaton: application.clone(),
877                    relay: application.clone(),
878                    reporter: supervisor.clone(),
879                    supervisor,
880                    partition: validator.to_string(),
881                    compression: Some(3),
882                    mailbox_size: 1024,
883                    namespace: namespace.clone(),
884                    leader_timeout: Duration::from_secs(1),
885                    notarization_timeout: Duration::from_secs(2),
886                    nullify_retry: Duration::from_secs(10),
887                    fetch_timeout: Duration::from_secs(1),
888                    activity_timeout,
889                    skip_timeout,
890                    max_fetch_count: 1, // force many fetches
891                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
892                    fetch_concurrent: 1,
893                    replay_buffer: NZUsize!(1024 * 1024),
894                    write_buffer: NZUsize!(1024 * 1024),
895                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
896                };
897                let engine = Engine::new(context.with_label("engine"), cfg);
898
899                // Start engine
900                let (pending, recovered, resolver) = registrations
901                    .remove(&validator)
902                    .expect("validator should be registered");
903                engine_handlers.push(engine.start(pending, recovered, resolver));
904            }
905
906            // Wait for all engines to finish
907            let mut finalizers = Vec::new();
908            for supervisor in supervisors.iter_mut() {
909                let (mut latest, mut monitor) = supervisor.subscribe().await;
910                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
911                    while latest < required_containers {
912                        latest = monitor.next().await.expect("event missing");
913                    }
914                }));
915            }
916            join_all(finalizers).await;
917
918            // Degrade network connections for online peers
919            let link = Link {
920                latency: 3_000.0,
921                jitter: 0.0,
922                success_rate: 1.0,
923            };
924            link_validators(
925                &mut oracle,
926                &validators,
927                Action::Update(link.clone()),
928                Some(|_, i, j| ![i, j].contains(&0usize)),
929            )
930            .await;
931
932            // Wait for nullifications to accrue
933            context.sleep(Duration::from_secs(120)).await;
934
935            // Unlink second peer from all (except first)
936            link_validators(
937                &mut oracle,
938                &validators,
939                Action::Unlink,
940                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
941            )
942            .await;
943
944            // Configure engine for first peer
945            let scheme = schemes[0].clone();
946            let validator = scheme.public_key();
947            let context = context.with_label(&format!("validator-{validator}"));
948
949            // Link first peer to all (except second)
950            link_validators(
951                &mut oracle,
952                &validators,
953                Action::Link(link),
954                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
955            )
956            .await;
957
958            // Restore network connections for all online peers
959            let link = Link {
960                latency: 10.0,
961                jitter: 2.5,
962                success_rate: 1.0,
963            };
964            link_validators(
965                &mut oracle,
966                &validators,
967                Action::Update(link),
968                Some(|_, i, j| ![i, j].contains(&1usize)),
969            )
970            .await;
971
972            // Configure engine
973            let mut participants = BTreeMap::new();
974            participants.insert(
975                0,
976                (polynomial.clone(), validators.clone(), shares[0].clone()),
977            );
978            let supervisor_config = mocks::supervisor::Config::<_, V> {
979                namespace: namespace.clone(),
980                participants,
981            };
982            let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
983            supervisors.push(supervisor.clone());
984            let application_cfg = mocks::application::Config {
985                hasher: Sha256::default(),
986                relay: relay.clone(),
987                participant: validator.clone(),
988                propose_latency: (10.0, 5.0),
989                verify_latency: (10.0, 5.0),
990            };
991            let (actor, application) = mocks::application::Application::new(
992                context.with_label("application"),
993                application_cfg,
994            );
995            actor.start();
996            let blocker = oracle.control(scheme.public_key());
997            let cfg = config::Config {
998                crypto: scheme,
999                blocker,
1000                automaton: application.clone(),
1001                relay: application.clone(),
1002                reporter: supervisor.clone(),
1003                supervisor: supervisor.clone(),
1004                partition: validator.to_string(),
1005                compression: Some(3),
1006                mailbox_size: 1024,
1007                namespace: namespace.clone(),
1008                leader_timeout: Duration::from_secs(1),
1009                notarization_timeout: Duration::from_secs(2),
1010                nullify_retry: Duration::from_secs(10),
1011                fetch_timeout: Duration::from_secs(1),
1012                activity_timeout,
1013                skip_timeout,
1014                max_fetch_count: 1,
1015                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1016                fetch_concurrent: 1,
1017                replay_buffer: NZUsize!(1024 * 1024),
1018                write_buffer: NZUsize!(1024 * 1024),
1019                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1020            };
1021            let engine = Engine::new(context.with_label("engine"), cfg);
1022
1023            // Start engine
1024            let (pending, recovered, resolver) = registrations
1025                .remove(&validator)
1026                .expect("validator should be registered");
1027            engine_handlers.push(engine.start(pending, recovered, resolver));
1028
1029            // Wait for new engine to finalize required
1030            let (mut latest, mut monitor) = supervisor.subscribe().await;
1031            while latest < required_containers {
1032                latest = monitor.next().await.expect("event missing");
1033            }
1034
1035            // Ensure no blocked connections
1036            let blocked = oracle.blocked().await.unwrap();
1037            assert!(blocked.is_empty());
1038        });
1039    }
1040
1041    #[test_traced]
1042    fn test_backfill() {
1043        backfill::<MinPk>();
1044        backfill::<MinSig>();
1045    }
1046
1047    fn one_offline<V: Variant>() {
1048        // Create context
1049        let n = 5;
1050        let threshold = quorum(n);
1051        let required_containers = 100;
1052        let activity_timeout = 10;
1053        let skip_timeout = 5;
1054        let max_exceptions = 10;
1055        let namespace = b"consensus".to_vec();
1056        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1057        executor.start(|mut context| async move {
1058            // Create simulated network
1059            let (network, mut oracle) = Network::new(
1060                context.with_label("network"),
1061                Config {
1062                    max_size: 1024 * 1024,
1063                },
1064            );
1065
1066            // Start network
1067            network.start();
1068
1069            // Register participants
1070            let mut schemes = Vec::new();
1071            let mut validators = Vec::new();
1072            for i in 0..n {
1073                let scheme = PrivateKey::from_seed(i as u64);
1074                let pk = scheme.public_key();
1075                schemes.push(scheme);
1076                validators.push(pk);
1077            }
1078            validators.sort();
1079            schemes.sort_by_key(|s| s.public_key());
1080            let mut registrations = register_validators(&mut oracle, &validators).await;
1081
1082            // Link all validators except first
1083            let link = Link {
1084                latency: 10.0,
1085                jitter: 1.0,
1086                success_rate: 1.0,
1087            };
1088            link_validators(
1089                &mut oracle,
1090                &validators,
1091                Action::Link(link),
1092                Some(|_, i, j| ![i, j].contains(&0usize)),
1093            )
1094            .await;
1095
1096            // Derive threshold
1097            let (polynomial, shares) =
1098                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1099
1100            // Create engines
1101            let relay = Arc::new(mocks::relay::Relay::new());
1102            let mut supervisors = Vec::new();
1103            let mut engine_handlers = Vec::new();
1104            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1105                // Skip first peer
1106                if idx_scheme == 0 {
1107                    continue;
1108                }
1109
1110                // Create scheme context
1111                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1112
1113                // Configure engine
1114                let validator = scheme.public_key();
1115                let mut participants = BTreeMap::new();
1116                participants.insert(
1117                    0,
1118                    (
1119                        polynomial.clone(),
1120                        validators.clone(),
1121                        shares[idx_scheme].clone(),
1122                    ),
1123                );
1124                let supervisor_config = mocks::supervisor::Config::<_, V> {
1125                    namespace: namespace.clone(),
1126                    participants,
1127                };
1128                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1129                supervisors.push(supervisor.clone());
1130                let application_cfg = mocks::application::Config {
1131                    hasher: Sha256::default(),
1132                    relay: relay.clone(),
1133                    participant: validator.clone(),
1134                    propose_latency: (10.0, 5.0),
1135                    verify_latency: (10.0, 5.0),
1136                };
1137                let (actor, application) = mocks::application::Application::new(
1138                    context.with_label("application"),
1139                    application_cfg,
1140                );
1141                actor.start();
1142                let blocker = oracle.control(scheme.public_key());
1143                let cfg = config::Config {
1144                    crypto: scheme,
1145                    blocker,
1146                    automaton: application.clone(),
1147                    relay: application.clone(),
1148                    reporter: supervisor.clone(),
1149                    supervisor,
1150                    partition: validator.to_string(),
1151                    compression: Some(3),
1152                    mailbox_size: 1024,
1153                    namespace: namespace.clone(),
1154                    leader_timeout: Duration::from_secs(1),
1155                    notarization_timeout: Duration::from_secs(2),
1156                    nullify_retry: Duration::from_secs(10),
1157                    fetch_timeout: Duration::from_secs(1),
1158                    activity_timeout,
1159                    skip_timeout,
1160                    max_fetch_count: 1,
1161                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1162                    fetch_concurrent: 1,
1163                    replay_buffer: NZUsize!(1024 * 1024),
1164                    write_buffer: NZUsize!(1024 * 1024),
1165                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1166                };
1167                let engine = Engine::new(context.with_label("engine"), cfg);
1168
1169                // Start engine
1170                let (pending, recovered, resolver) = registrations
1171                    .remove(&validator)
1172                    .expect("validator should be registered");
1173                engine_handlers.push(engine.start(pending, recovered, resolver));
1174            }
1175
1176            // Wait for all engines to finish
1177            let mut finalizers = Vec::new();
1178            for supervisor in supervisors.iter_mut() {
1179                let (mut latest, mut monitor) = supervisor.subscribe().await;
1180                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1181                    while latest < required_containers {
1182                        latest = monitor.next().await.expect("event missing");
1183                    }
1184                }));
1185            }
1186            join_all(finalizers).await;
1187
1188            // Check supervisors for correct activity
1189            let exceptions = 0;
1190            let offline = &validators[0];
1191            for supervisor in supervisors.iter() {
1192                // Ensure no faults
1193                {
1194                    let faults = supervisor.faults.lock().unwrap();
1195                    assert!(faults.is_empty());
1196                }
1197
1198                // Ensure no invalid signatures
1199                {
1200                    let invalid = supervisor.invalid.lock().unwrap();
1201                    assert_eq!(*invalid, 0);
1202                }
1203
1204                // Ensure offline node is never active
1205                let mut exceptions = 0;
1206                {
1207                    let notarizes = supervisor.notarizes.lock().unwrap();
1208                    for (view, payloads) in notarizes.iter() {
1209                        for (_, participants) in payloads.iter() {
1210                            if participants.contains(offline) {
1211                                panic!("view: {view}");
1212                            }
1213                        }
1214                    }
1215                }
1216                {
1217                    let nullifies = supervisor.nullifies.lock().unwrap();
1218                    for (view, participants) in nullifies.iter() {
1219                        if participants.contains(offline) {
1220                            panic!("view: {view}");
1221                        }
1222                    }
1223                }
1224                {
1225                    let finalizes = supervisor.finalizes.lock().unwrap();
1226                    for (view, payloads) in finalizes.iter() {
1227                        for (_, finalizers) in payloads.iter() {
1228                            if finalizers.contains(offline) {
1229                                panic!("view: {view}");
1230                            }
1231                        }
1232                    }
1233                }
1234
1235                // Identify offline views
1236                let mut offline_views = Vec::new();
1237                {
1238                    let leaders = supervisor.leaders.lock().unwrap();
1239                    for (view, leader) in leaders.iter() {
1240                        if leader == offline {
1241                            offline_views.push(*view);
1242                        }
1243                    }
1244                }
1245                assert!(!offline_views.is_empty());
1246
1247                // Ensure nullifies/nullification collected for offline node
1248                {
1249                    let nullifies = supervisor.nullifies.lock().unwrap();
1250                    for view in offline_views.iter() {
1251                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1252                        if nullifies < threshold as usize {
1253                            warn!("missing expected view nullifies: {}", view);
1254                            exceptions += 1;
1255                        }
1256                    }
1257                }
1258                {
1259                    let nullifications = supervisor.nullifications.lock().unwrap();
1260                    for view in offline_views.iter() {
1261                        if !nullifications.contains_key(view) {
1262                            warn!("missing expected view nullifies: {}", view);
1263                            exceptions += 1;
1264                        }
1265                    }
1266                }
1267
1268                // Ensure exceptions within allowed
1269                assert!(exceptions <= max_exceptions);
1270            }
1271            assert!(exceptions <= max_exceptions);
1272
1273            // Ensure no blocked connections
1274            let blocked = oracle.blocked().await.unwrap();
1275            assert!(blocked.is_empty());
1276
1277            // Ensure we are skipping views
1278            let encoded = context.encode();
1279            let lines = encoded.lines();
1280            let mut skipped_views = 0;
1281            let mut nodes_skipping = 0;
1282            for line in lines {
1283                if line.contains("_engine_voter_skipped_views_total") {
1284                    let parts: Vec<&str> = line.split_whitespace().collect();
1285                    if let Some(number_str) = parts.last() {
1286                        if let Ok(number) = number_str.parse::<u64>() {
1287                            if number > 0 {
1288                                nodes_skipping += 1;
1289                            }
1290                            if number > skipped_views {
1291                                skipped_views = number;
1292                            }
1293                        }
1294                    }
1295                }
1296            }
1297            assert!(
1298                skipped_views > 0,
1299                "expected skipped views to be greater than 0"
1300            );
1301            assert_eq!(
1302                nodes_skipping,
1303                n - 1,
1304                "expected all online nodes to be skipping views"
1305            );
1306        });
1307    }
1308
1309    #[test_traced]
1310    fn test_one_offline() {
1311        one_offline::<MinPk>();
1312        one_offline::<MinSig>();
1313    }
1314
1315    fn slow_validator<V: Variant>() {
1316        // Create context
1317        let n = 5;
1318        let threshold = quorum(n);
1319        let required_containers = 50;
1320        let activity_timeout = 10;
1321        let skip_timeout = 5;
1322        let namespace = b"consensus".to_vec();
1323        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1324        executor.start(|mut context| async move {
1325            // Create simulated network
1326            let (network, mut oracle) = Network::new(
1327                context.with_label("network"),
1328                Config {
1329                    max_size: 1024 * 1024,
1330                },
1331            );
1332
1333            // Start network
1334            network.start();
1335
1336            // Register participants
1337            let mut schemes = Vec::new();
1338            let mut validators = Vec::new();
1339            for i in 0..n {
1340                let scheme = PrivateKey::from_seed(i as u64);
1341                let pk = scheme.public_key();
1342                schemes.push(scheme);
1343                validators.push(pk);
1344            }
1345            validators.sort();
1346            schemes.sort_by_key(|s| s.public_key());
1347            let mut registrations = register_validators(&mut oracle, &validators).await;
1348
1349            // Link all validators
1350            let link = Link {
1351                latency: 10.0,
1352                jitter: 1.0,
1353                success_rate: 1.0,
1354            };
1355            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1356
1357            // Derive threshold
1358            let (polynomial, shares) =
1359                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1360
1361            // Create engines
1362            let relay = Arc::new(mocks::relay::Relay::new());
1363            let mut supervisors = Vec::new();
1364            let mut engine_handlers = Vec::new();
1365            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1366                // Create scheme context
1367                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1368
1369                // Configure engine
1370                let validator = scheme.public_key();
1371                let mut participants = BTreeMap::new();
1372                participants.insert(
1373                    0,
1374                    (
1375                        polynomial.clone(),
1376                        validators.clone(),
1377                        shares[idx_scheme].clone(),
1378                    ),
1379                );
1380                let supervisor_config = mocks::supervisor::Config::<_, V> {
1381                    namespace: namespace.clone(),
1382                    participants,
1383                };
1384                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1385                supervisors.push(supervisor.clone());
1386                let application_cfg = if idx_scheme == 0 {
1387                    mocks::application::Config {
1388                        hasher: Sha256::default(),
1389                        relay: relay.clone(),
1390                        participant: validator.clone(),
1391                        propose_latency: (10_000.0, 0.0),
1392                        verify_latency: (10_000.0, 5.0),
1393                    }
1394                } else {
1395                    mocks::application::Config {
1396                        hasher: Sha256::default(),
1397                        relay: relay.clone(),
1398                        participant: validator.clone(),
1399                        propose_latency: (10.0, 5.0),
1400                        verify_latency: (10.0, 5.0),
1401                    }
1402                };
1403                let (actor, application) = mocks::application::Application::new(
1404                    context.with_label("application"),
1405                    application_cfg,
1406                );
1407                actor.start();
1408                let blocker = oracle.control(scheme.public_key());
1409                let cfg = config::Config {
1410                    crypto: scheme,
1411                    blocker,
1412                    automaton: application.clone(),
1413                    relay: application.clone(),
1414                    reporter: supervisor.clone(),
1415                    supervisor,
1416                    partition: validator.to_string(),
1417                    compression: Some(3),
1418                    mailbox_size: 1024,
1419                    namespace: namespace.clone(),
1420                    leader_timeout: Duration::from_secs(1),
1421                    notarization_timeout: Duration::from_secs(2),
1422                    nullify_retry: Duration::from_secs(10),
1423                    fetch_timeout: Duration::from_secs(1),
1424                    activity_timeout,
1425                    skip_timeout,
1426                    max_fetch_count: 1,
1427                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1428                    fetch_concurrent: 1,
1429                    replay_buffer: NZUsize!(1024 * 1024),
1430                    write_buffer: NZUsize!(1024 * 1024),
1431                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1432                };
1433                let engine = Engine::new(context.with_label("engine"), cfg);
1434
1435                // Start engine
1436                let (pending, recovered, resolver) = registrations
1437                    .remove(&validator)
1438                    .expect("validator should be registered");
1439                engine_handlers.push(engine.start(pending, recovered, resolver));
1440            }
1441
1442            // Wait for all engines to finish
1443            let mut finalizers = Vec::new();
1444            for supervisor in supervisors.iter_mut() {
1445                let (mut latest, mut monitor) = supervisor.subscribe().await;
1446                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1447                    while latest < required_containers {
1448                        latest = monitor.next().await.expect("event missing");
1449                    }
1450                }));
1451            }
1452            join_all(finalizers).await;
1453
1454            // Check supervisors for correct activity
1455            let slow = &validators[0];
1456            for supervisor in supervisors.iter() {
1457                // Ensure no faults
1458                {
1459                    let faults = supervisor.faults.lock().unwrap();
1460                    assert!(faults.is_empty());
1461                }
1462
1463                // Ensure no invalid signatures
1464                {
1465                    let invalid = supervisor.invalid.lock().unwrap();
1466                    assert_eq!(*invalid, 0);
1467                }
1468
1469                // Ensure slow node never emits a notarize or finalize (will never finish verification in a timely manner)
1470                {
1471                    let notarizes = supervisor.notarizes.lock().unwrap();
1472                    for (view, payloads) in notarizes.iter() {
1473                        for (_, participants) in payloads.iter() {
1474                            if participants.contains(slow) {
1475                                panic!("view: {view}");
1476                            }
1477                        }
1478                    }
1479                }
1480                {
1481                    let finalizes = supervisor.finalizes.lock().unwrap();
1482                    for (view, payloads) in finalizes.iter() {
1483                        for (_, finalizers) in payloads.iter() {
1484                            if finalizers.contains(slow) {
1485                                panic!("view: {view}");
1486                            }
1487                        }
1488                    }
1489                }
1490            }
1491
1492            // Ensure no blocked connections
1493            let blocked = oracle.blocked().await.unwrap();
1494            assert!(blocked.is_empty());
1495        });
1496    }
1497
1498    #[test_traced]
1499    fn test_slow_validator() {
1500        slow_validator::<MinPk>();
1501        slow_validator::<MinSig>();
1502    }
1503
1504    fn all_recovery<V: Variant>() {
1505        // Create context
1506        let n = 5;
1507        let threshold = quorum(n);
1508        let required_containers = 100;
1509        let activity_timeout = 10;
1510        let skip_timeout = 2;
1511        let namespace = b"consensus".to_vec();
1512        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1513        executor.start(|mut context| async move {
1514            // Create simulated network
1515            let (network, mut oracle) = Network::new(
1516                context.with_label("network"),
1517                Config {
1518                    max_size: 1024 * 1024,
1519                },
1520            );
1521
1522            // Start network
1523            network.start();
1524
1525            // Register participants
1526            let mut schemes = Vec::new();
1527            let mut validators = Vec::new();
1528            for i in 0..n {
1529                let scheme = PrivateKey::from_seed(i as u64);
1530                let pk = scheme.public_key();
1531                schemes.push(scheme);
1532                validators.push(pk);
1533            }
1534            validators.sort();
1535            schemes.sort_by_key(|s| s.public_key());
1536            let mut registrations = register_validators(&mut oracle, &validators).await;
1537
1538            // Link all validators
1539            let link = Link {
1540                latency: 3_000.0,
1541                jitter: 0.0,
1542                success_rate: 1.0,
1543            };
1544            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1545
1546            // Derive threshold
1547            let (polynomial, shares) =
1548                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1549
1550            // Create engines
1551            let relay = Arc::new(mocks::relay::Relay::new());
1552            let mut supervisors = Vec::new();
1553            let mut engine_handlers = Vec::new();
1554            for (idx, scheme) in schemes.iter().enumerate() {
1555                // Create scheme context
1556                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1557
1558                // Configure engine
1559                let validator = scheme.public_key();
1560                let mut participants = BTreeMap::new();
1561                participants.insert(
1562                    0,
1563                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1564                );
1565                let supervisor_config = mocks::supervisor::Config::<_, V> {
1566                    namespace: namespace.clone(),
1567                    participants,
1568                };
1569                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1570                supervisors.push(supervisor.clone());
1571                let application_cfg = mocks::application::Config {
1572                    hasher: Sha256::default(),
1573                    relay: relay.clone(),
1574                    participant: validator.clone(),
1575                    propose_latency: (10.0, 5.0),
1576                    verify_latency: (10.0, 5.0),
1577                };
1578                let (actor, application) = mocks::application::Application::new(
1579                    context.with_label("application"),
1580                    application_cfg,
1581                );
1582                actor.start();
1583                let blocker = oracle.control(scheme.public_key());
1584                let cfg = config::Config {
1585                    crypto: scheme.clone(),
1586                    blocker,
1587                    automaton: application.clone(),
1588                    relay: application.clone(),
1589                    reporter: supervisor.clone(),
1590                    supervisor,
1591                    partition: validator.to_string(),
1592                    compression: Some(3),
1593                    mailbox_size: 1024,
1594                    namespace: namespace.clone(),
1595                    leader_timeout: Duration::from_secs(1),
1596                    notarization_timeout: Duration::from_secs(2),
1597                    nullify_retry: Duration::from_secs(10),
1598                    fetch_timeout: Duration::from_secs(1),
1599                    activity_timeout,
1600                    skip_timeout,
1601                    max_fetch_count: 1,
1602                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1603                    fetch_concurrent: 1,
1604                    replay_buffer: NZUsize!(1024 * 1024),
1605                    write_buffer: NZUsize!(1024 * 1024),
1606                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1607                };
1608                let engine = Engine::new(context.with_label("engine"), cfg);
1609
1610                // Start engine
1611                let (pending, recovered, resolver) = registrations
1612                    .remove(&validator)
1613                    .expect("validator should be registered");
1614                engine_handlers.push(engine.start(pending, recovered, resolver));
1615            }
1616
1617            // Wait for a few virtual minutes (shouldn't finalize anything)
1618            let mut finalizers = Vec::new();
1619            for supervisor in supervisors.iter_mut() {
1620                let (_, mut monitor) = supervisor.subscribe().await;
1621                finalizers.push(
1622                    context
1623                        .with_label("finalizer")
1624                        .spawn(move |context| async move {
1625                            select! {
1626                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1627                                _done = monitor.next() => {
1628                                    panic!("engine should not notarize or finalize anything");
1629                                }
1630                            }
1631                        }),
1632                );
1633            }
1634            join_all(finalizers).await;
1635
1636            // Unlink all validators to get latest view
1637            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1638
1639            // Wait for a virtual minute (nothing should happen)
1640            context.sleep(Duration::from_secs(60)).await;
1641
1642            // Get latest view
1643            let mut latest = 0;
1644            for supervisor in supervisors.iter() {
1645                let nullifies = supervisor.nullifies.lock().unwrap();
1646                let max = nullifies.keys().max().unwrap();
1647                if *max > latest {
1648                    latest = *max;
1649                }
1650            }
1651
1652            // Update links
1653            let link = Link {
1654                latency: 10.0,
1655                jitter: 1.0,
1656                success_rate: 1.0,
1657            };
1658            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1659
1660            // Wait for all engines to finish
1661            let mut finalizers = Vec::new();
1662            for supervisor in supervisors.iter_mut() {
1663                let (mut latest, mut monitor) = supervisor.subscribe().await;
1664                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1665                    while latest < required_containers {
1666                        latest = monitor.next().await.expect("event missing");
1667                    }
1668                }));
1669            }
1670            join_all(finalizers).await;
1671
1672            // Check supervisors for correct activity
1673            for supervisor in supervisors.iter() {
1674                // Ensure no faults
1675                {
1676                    let faults = supervisor.faults.lock().unwrap();
1677                    assert!(faults.is_empty());
1678                }
1679
1680                // Ensure no invalid signatures
1681                {
1682                    let invalid = supervisor.invalid.lock().unwrap();
1683                    assert_eq!(*invalid, 0);
1684                }
1685
1686                // Ensure quick recovery.
1687                //
1688                // If the skip timeout isn't implemented correctly, we may go many views before participants
1689                // start to consider a validator's proposal.
1690                {
1691                    // Ensure nearly all views around latest finalize
1692                    let mut found = 0;
1693                    let finalizations = supervisor.finalizations.lock().unwrap();
1694                    for i in latest..latest + activity_timeout {
1695                        if finalizations.contains_key(&i) {
1696                            found += 1;
1697                        }
1698                    }
1699                    assert!(found >= activity_timeout - 2, "found: {found}");
1700                }
1701            }
1702
1703            // Ensure no blocked connections
1704            let blocked = oracle.blocked().await.unwrap();
1705            assert!(blocked.is_empty());
1706        });
1707    }
1708
1709    #[test_traced]
1710    fn test_all_recovery() {
1711        all_recovery::<MinPk>();
1712        all_recovery::<MinSig>();
1713    }
1714
1715    fn partition<V: Variant>() {
1716        // Create context
1717        let n = 10;
1718        let threshold = quorum(n);
1719        let required_containers = 50;
1720        let activity_timeout = 10;
1721        let skip_timeout = 5;
1722        let namespace = b"consensus".to_vec();
1723        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1724        executor.start(|mut context| async move {
1725            // Create simulated network
1726            let (network, mut oracle) = Network::new(
1727                context.with_label("network"),
1728                Config {
1729                    max_size: 1024 * 1024,
1730                },
1731            );
1732
1733            // Start network
1734            network.start();
1735
1736            // Register participants
1737            let mut schemes = Vec::new();
1738            let mut validators = Vec::new();
1739            for i in 0..n {
1740                let scheme = PrivateKey::from_seed(i as u64);
1741                let pk = scheme.public_key();
1742                schemes.push(scheme);
1743                validators.push(pk);
1744            }
1745            validators.sort();
1746            schemes.sort_by_key(|s| s.public_key());
1747            let mut registrations = register_validators(&mut oracle, &validators).await;
1748
1749            // Link all validators
1750            let link = Link {
1751                latency: 10.0,
1752                jitter: 1.0,
1753                success_rate: 1.0,
1754            };
1755            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1756
1757            // Derive threshold
1758            let (polynomial, shares) =
1759                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1760
1761            // Create engines
1762            let relay = Arc::new(mocks::relay::Relay::new());
1763            let mut supervisors = Vec::new();
1764            let mut engine_handlers = Vec::new();
1765            for (idx, scheme) in schemes.iter().enumerate() {
1766                // Create scheme context
1767                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1768
1769                // Configure engine
1770                let validator = scheme.public_key();
1771                let mut participants = BTreeMap::new();
1772                participants.insert(
1773                    0,
1774                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1775                );
1776                let supervisor_config = mocks::supervisor::Config::<_, V> {
1777                    namespace: namespace.clone(),
1778                    participants,
1779                };
1780                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1781                supervisors.push(supervisor.clone());
1782                let application_cfg = mocks::application::Config {
1783                    hasher: Sha256::default(),
1784                    relay: relay.clone(),
1785                    participant: validator.clone(),
1786                    propose_latency: (10.0, 5.0),
1787                    verify_latency: (10.0, 5.0),
1788                };
1789                let (actor, application) = mocks::application::Application::new(
1790                    context.with_label("application"),
1791                    application_cfg,
1792                );
1793                actor.start();
1794                let blocker = oracle.control(scheme.public_key());
1795                let cfg = config::Config {
1796                    crypto: scheme.clone(),
1797                    blocker,
1798                    automaton: application.clone(),
1799                    relay: application.clone(),
1800                    reporter: supervisor.clone(),
1801                    supervisor,
1802                    partition: validator.to_string(),
1803                    compression: Some(3),
1804                    mailbox_size: 1024,
1805                    namespace: namespace.clone(),
1806                    leader_timeout: Duration::from_secs(1),
1807                    notarization_timeout: Duration::from_secs(2),
1808                    nullify_retry: Duration::from_secs(10),
1809                    fetch_timeout: Duration::from_secs(1),
1810                    activity_timeout,
1811                    skip_timeout,
1812                    max_fetch_count: 1,
1813                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1814                    fetch_concurrent: 1,
1815                    replay_buffer: NZUsize!(1024 * 1024),
1816                    write_buffer: NZUsize!(1024 * 1024),
1817                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1818                };
1819                let engine = Engine::new(context.with_label("engine"), cfg);
1820
1821                // Start engine
1822                let (pending, recovered, resolver) = registrations
1823                    .remove(&validator)
1824                    .expect("validator should be registered");
1825                engine_handlers.push(engine.start(pending, recovered, resolver));
1826            }
1827
1828            // Wait for all engines to finish
1829            let mut finalizers = Vec::new();
1830            for supervisor in supervisors.iter_mut() {
1831                let (mut latest, mut monitor) = supervisor.subscribe().await;
1832                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1833                    while latest < required_containers {
1834                        latest = monitor.next().await.expect("event missing");
1835                    }
1836                }));
1837            }
1838            join_all(finalizers).await;
1839
1840            // Cut all links between validator halves
1841            fn separated(n: usize, a: usize, b: usize) -> bool {
1842                let m = n / 2;
1843                (a < m && b >= m) || (a >= m && b < m)
1844            }
1845            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1846
1847            // Wait for any in-progress notarizations/finalizations to finish
1848            context.sleep(Duration::from_secs(10)).await;
1849
1850            // Wait for a few virtual minutes (shouldn't finalize anything)
1851            let mut finalizers = Vec::new();
1852            for supervisor in supervisors.iter_mut() {
1853                let (_, mut monitor) = supervisor.subscribe().await;
1854                finalizers.push(
1855                    context
1856                        .with_label("finalizer")
1857                        .spawn(move |context| async move {
1858                            select! {
1859                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1860                                _done = monitor.next() => {
1861                                    panic!("engine should not notarize or finalize anything");
1862                                }
1863                            }
1864                        }),
1865                );
1866            }
1867            join_all(finalizers).await;
1868
1869            // Restore links
1870            link_validators(
1871                &mut oracle,
1872                &validators,
1873                Action::Link(link),
1874                Some(separated),
1875            )
1876            .await;
1877
1878            // Wait for all engines to finish
1879            let mut finalizers = Vec::new();
1880            for supervisor in supervisors.iter_mut() {
1881                let (mut latest, mut monitor) = supervisor.subscribe().await;
1882                let required = latest + required_containers;
1883                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1884                    while latest < required {
1885                        latest = monitor.next().await.expect("event missing");
1886                    }
1887                }));
1888            }
1889            join_all(finalizers).await;
1890
1891            // Check supervisors for correct activity
1892            for supervisor in supervisors.iter() {
1893                // Ensure no faults
1894                {
1895                    let faults = supervisor.faults.lock().unwrap();
1896                    assert!(faults.is_empty());
1897                }
1898
1899                // Ensure no invalid signatures
1900                {
1901                    let invalid = supervisor.invalid.lock().unwrap();
1902                    assert_eq!(*invalid, 0);
1903                }
1904            }
1905
1906            // Ensure no blocked connections
1907            let blocked = oracle.blocked().await.unwrap();
1908            assert!(blocked.is_empty());
1909        });
1910    }
1911
1912    #[test_traced]
1913    #[ignore]
1914    fn test_partition() {
1915        partition::<MinPk>();
1916        partition::<MinSig>();
1917    }
1918
1919    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
1920        // Create context
1921        let n = 5;
1922        let threshold = quorum(n);
1923        let required_containers = 50;
1924        let activity_timeout = 10;
1925        let skip_timeout = 5;
1926        let namespace = b"consensus".to_vec();
1927        let cfg = deterministic::Config::new()
1928            .with_seed(seed)
1929            .with_timeout(Some(Duration::from_secs(5_000)));
1930        let executor = deterministic::Runner::new(cfg);
1931        executor.start(|mut context| async move {
1932            // Create simulated network
1933            let (network, mut oracle) = Network::new(
1934                context.with_label("network"),
1935                Config {
1936                    max_size: 1024 * 1024,
1937                },
1938            );
1939
1940            // Start network
1941            network.start();
1942
1943            // Register participants
1944            let mut schemes = Vec::new();
1945            let mut validators = Vec::new();
1946            for i in 0..n {
1947                let scheme = PrivateKey::from_seed(i as u64);
1948                let pk = scheme.public_key();
1949                schemes.push(scheme);
1950                validators.push(pk);
1951            }
1952            validators.sort();
1953            schemes.sort_by_key(|s| s.public_key());
1954            let mut registrations = register_validators(&mut oracle, &validators).await;
1955
1956            // Link all validators
1957            let degraded_link = Link {
1958                latency: 200.0,
1959                jitter: 150.0,
1960                success_rate: 0.5,
1961            };
1962            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1963
1964            // Derive threshold
1965            let (polynomial, shares) =
1966                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1967
1968            // Create engines
1969            let relay = Arc::new(mocks::relay::Relay::new());
1970            let mut supervisors = Vec::new();
1971            let mut engine_handlers = Vec::new();
1972            for (idx, scheme) in schemes.into_iter().enumerate() {
1973                // Create scheme context
1974                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1975
1976                // Configure engine
1977                let validator = scheme.public_key();
1978                let mut participants = BTreeMap::new();
1979                participants.insert(
1980                    0,
1981                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1982                );
1983                let supervisor_config = mocks::supervisor::Config::<_, V> {
1984                    namespace: namespace.clone(),
1985                    participants,
1986                };
1987                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1988                supervisors.push(supervisor.clone());
1989                let application_cfg = mocks::application::Config {
1990                    hasher: Sha256::default(),
1991                    relay: relay.clone(),
1992                    participant: validator.clone(),
1993                    propose_latency: (10.0, 5.0),
1994                    verify_latency: (10.0, 5.0),
1995                };
1996                let (actor, application) = mocks::application::Application::new(
1997                    context.with_label("application"),
1998                    application_cfg,
1999                );
2000                actor.start();
2001                let blocker = oracle.control(scheme.public_key());
2002                let cfg = config::Config {
2003                    crypto: scheme,
2004                    blocker,
2005                    automaton: application.clone(),
2006                    relay: application.clone(),
2007                    reporter: supervisor.clone(),
2008                    supervisor,
2009                    partition: validator.to_string(),
2010                    compression: Some(3),
2011                    mailbox_size: 1024,
2012                    namespace: namespace.clone(),
2013                    leader_timeout: Duration::from_secs(1),
2014                    notarization_timeout: Duration::from_secs(2),
2015                    nullify_retry: Duration::from_secs(10),
2016                    fetch_timeout: Duration::from_secs(1),
2017                    activity_timeout,
2018                    skip_timeout,
2019                    max_fetch_count: 1,
2020                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2021                    fetch_concurrent: 1,
2022                    replay_buffer: NZUsize!(1024 * 1024),
2023                    write_buffer: NZUsize!(1024 * 1024),
2024                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2025                };
2026                let engine = Engine::new(context.with_label("engine"), cfg);
2027
2028                // Start engine
2029                let (pending, recovered, resolver) = registrations
2030                    .remove(&validator)
2031                    .expect("validator should be registered");
2032                engine_handlers.push(engine.start(pending, recovered, resolver));
2033            }
2034
2035            // Wait for all engines to finish
2036            let mut finalizers = Vec::new();
2037            for supervisor in supervisors.iter_mut() {
2038                let (mut latest, mut monitor) = supervisor.subscribe().await;
2039                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2040                    while latest < required_containers {
2041                        latest = monitor.next().await.expect("event missing");
2042                    }
2043                }));
2044            }
2045            join_all(finalizers).await;
2046
2047            // Check supervisors for correct activity
2048            for supervisor in supervisors.iter() {
2049                // Ensure no faults
2050                {
2051                    let faults = supervisor.faults.lock().unwrap();
2052                    assert!(faults.is_empty());
2053                }
2054
2055                // Ensure no invalid signatures
2056                {
2057                    let invalid = supervisor.invalid.lock().unwrap();
2058                    assert_eq!(*invalid, 0);
2059                }
2060            }
2061
2062            // Ensure no blocked connections
2063            let blocked = oracle.blocked().await.unwrap();
2064            assert!(blocked.is_empty());
2065
2066            context.auditor().state()
2067        })
2068    }
2069
2070    #[test_traced]
2071    fn test_slow_and_lossy_links() {
2072        slow_and_lossy_links::<MinPk>(0);
2073        slow_and_lossy_links::<MinSig>(0);
2074    }
2075
2076    #[test_traced]
2077    #[ignore]
2078    fn test_determinism() {
2079        // We use slow and lossy links as the deterministic test
2080        // because it is the most complex test.
2081        for seed in 1..6 {
2082            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2083            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2084            assert_eq!(pk_state_1, pk_state_2);
2085
2086            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2087            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2088            assert_eq!(sig_state_1, sig_state_2);
2089
2090            // Sanity check that different types can't be identical
2091            assert_ne!(pk_state_1, sig_state_1);
2092        }
2093    }
2094
2095    fn conflicter<V: Variant>(seed: u64) {
2096        // Create context
2097        let n = 4;
2098        let threshold = quorum(n);
2099        let required_containers = 50;
2100        let activity_timeout = 10;
2101        let skip_timeout = 5;
2102        let namespace = b"consensus".to_vec();
2103        let cfg = deterministic::Config::new()
2104            .with_seed(seed)
2105            .with_timeout(Some(Duration::from_secs(30)));
2106        let executor = deterministic::Runner::new(cfg);
2107        executor.start(|mut context| async move {
2108            // Create simulated network
2109            let (network, mut oracle) = Network::new(
2110                context.with_label("network"),
2111                Config {
2112                    max_size: 1024 * 1024,
2113                },
2114            );
2115
2116            // Start network
2117            network.start();
2118
2119            // Register participants
2120            let mut schemes = Vec::new();
2121            let mut validators = Vec::new();
2122            for i in 0..n {
2123                let scheme = PrivateKey::from_seed(i as u64);
2124                let pk = scheme.public_key();
2125                schemes.push(scheme);
2126                validators.push(pk);
2127            }
2128            validators.sort();
2129            schemes.sort_by_key(|s| s.public_key());
2130            let mut registrations = register_validators(&mut oracle, &validators).await;
2131
2132            // Link all validators
2133            let link = Link {
2134                latency: 10.0,
2135                jitter: 1.0,
2136                success_rate: 1.0,
2137            };
2138            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2139
2140            // Derive threshold
2141            let (polynomial, shares) =
2142                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2143
2144            // Create engines
2145            let relay = Arc::new(mocks::relay::Relay::new());
2146            let mut supervisors = Vec::new();
2147            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2148                // Create scheme context
2149                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2150
2151                // Start engine
2152                let validator = scheme.public_key();
2153                let mut participants = BTreeMap::new();
2154                participants.insert(
2155                    0,
2156                    (
2157                        polynomial.clone(),
2158                        validators.clone(),
2159                        shares[idx_scheme].clone(),
2160                    ),
2161                );
2162                let supervisor_config = mocks::supervisor::Config::<_, V> {
2163                    namespace: namespace.clone(),
2164                    participants,
2165                };
2166                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2167                let (pending, recovered, resolver) = registrations
2168                    .remove(&validator)
2169                    .expect("validator should be registered");
2170                if idx_scheme == 0 {
2171                    let cfg = mocks::conflicter::Config {
2172                        supervisor,
2173                        namespace: namespace.clone(),
2174                    };
2175
2176                    let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2177                        mocks::conflicter::Conflicter::new(
2178                            context.with_label("byzantine_engine"),
2179                            cfg,
2180                        );
2181                    engine.start(pending);
2182                } else {
2183                    supervisors.push(supervisor.clone());
2184                    let application_cfg = mocks::application::Config {
2185                        hasher: Sha256::default(),
2186                        relay: relay.clone(),
2187                        participant: validator.clone(),
2188                        propose_latency: (10.0, 5.0),
2189                        verify_latency: (10.0, 5.0),
2190                    };
2191                    let (actor, application) = mocks::application::Application::new(
2192                        context.with_label("application"),
2193                        application_cfg,
2194                    );
2195                    actor.start();
2196                    let blocker = oracle.control(scheme.public_key());
2197                    let cfg = config::Config {
2198                        crypto: scheme,
2199                        blocker,
2200                        automaton: application.clone(),
2201                        relay: application.clone(),
2202                        reporter: supervisor.clone(),
2203                        supervisor,
2204                        partition: validator.to_string(),
2205                        compression: Some(3),
2206                        mailbox_size: 1024,
2207                        namespace: namespace.clone(),
2208                        leader_timeout: Duration::from_secs(1),
2209                        notarization_timeout: Duration::from_secs(2),
2210                        nullify_retry: Duration::from_secs(10),
2211                        fetch_timeout: Duration::from_secs(1),
2212                        activity_timeout,
2213                        skip_timeout,
2214                        max_fetch_count: 1,
2215                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2216                        fetch_concurrent: 1,
2217                        replay_buffer: NZUsize!(1024 * 1024),
2218                        write_buffer: NZUsize!(1024 * 1024),
2219                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2220                    };
2221                    let engine = Engine::new(context.with_label("engine"), cfg);
2222                    engine.start(pending, recovered, resolver);
2223                }
2224            }
2225
2226            // Wait for all engines to finish
2227            let mut finalizers = Vec::new();
2228            for supervisor in supervisors.iter_mut() {
2229                let (mut latest, mut monitor) = supervisor.subscribe().await;
2230                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2231                    while latest < required_containers {
2232                        latest = monitor.next().await.expect("event missing");
2233                    }
2234                }));
2235            }
2236            join_all(finalizers).await;
2237
2238            // Check supervisors for correct activity
2239            let byz = &validators[0];
2240            let mut count_conflicting = 0;
2241            for supervisor in supervisors.iter() {
2242                // Ensure only faults for byz
2243                {
2244                    let faults = supervisor.faults.lock().unwrap();
2245                    assert_eq!(faults.len(), 1);
2246                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2247                    for (_, faults) in faulter.iter() {
2248                        for fault in faults.iter() {
2249                            match fault {
2250                                Activity::ConflictingNotarize(_) => {
2251                                    count_conflicting += 1;
2252                                }
2253                                Activity::ConflictingFinalize(_) => {
2254                                    count_conflicting += 1;
2255                                }
2256                                _ => panic!("unexpected fault: {fault:?}"),
2257                            }
2258                        }
2259                    }
2260                }
2261
2262                // Ensure no invalid signatures
2263                {
2264                    let invalid = supervisor.invalid.lock().unwrap();
2265                    assert_eq!(*invalid, 0);
2266                }
2267            }
2268            assert!(count_conflicting > 0);
2269
2270            // Ensure conflicter is blocked
2271            let blocked = oracle.blocked().await.unwrap();
2272            assert!(!blocked.is_empty());
2273            for (a, b) in blocked {
2274                assert_ne!(&a, byz);
2275                assert_eq!(&b, byz);
2276            }
2277        });
2278    }
2279
2280    #[test_traced]
2281    #[ignore]
2282    fn test_conflicter() {
2283        for seed in 0..5 {
2284            conflicter::<MinPk>(seed);
2285            conflicter::<MinSig>(seed);
2286        }
2287    }
2288
2289    fn invalid<V: Variant>(seed: u64) {
2290        // Create context
2291        let n = 4;
2292        let threshold = quorum(n);
2293        let required_containers = 50;
2294        let activity_timeout = 10;
2295        let skip_timeout = 5;
2296        let namespace = b"consensus".to_vec();
2297        let cfg = deterministic::Config::new()
2298            .with_seed(seed)
2299            .with_timeout(Some(Duration::from_secs(30)));
2300        let executor = deterministic::Runner::new(cfg);
2301        executor.start(|mut context| async move {
2302            // Create simulated network
2303            let (network, mut oracle) = Network::new(
2304                context.with_label("network"),
2305                Config {
2306                    max_size: 1024 * 1024,
2307                },
2308            );
2309
2310            // Start network
2311            network.start();
2312
2313            // Register participants
2314            let mut schemes = Vec::new();
2315            let mut validators = Vec::new();
2316            for i in 0..n {
2317                let scheme = PrivateKey::from_seed(i as u64);
2318                let pk = scheme.public_key();
2319                schemes.push(scheme);
2320                validators.push(pk);
2321            }
2322            validators.sort();
2323            schemes.sort_by_key(|s| s.public_key());
2324            let mut registrations = register_validators(&mut oracle, &validators).await;
2325
2326            // Link all validators
2327            let link = Link {
2328                latency: 10.0,
2329                jitter: 1.0,
2330                success_rate: 1.0,
2331            };
2332            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2333
2334            // Derive threshold
2335            let (polynomial, shares) =
2336                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2337
2338            // Create engines
2339            let relay = Arc::new(mocks::relay::Relay::new());
2340            let mut supervisors = Vec::new();
2341            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2342                // Create scheme context
2343                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2344
2345                // Start engine
2346                let validator = scheme.public_key();
2347                let mut participants = BTreeMap::new();
2348                participants.insert(
2349                    0,
2350                    (
2351                        polynomial.clone(),
2352                        validators.clone(),
2353                        shares[idx_scheme].clone(),
2354                    ),
2355                );
2356                let supervisor_config = mocks::supervisor::Config::<_, V> {
2357                    namespace: namespace.clone(),
2358                    participants,
2359                };
2360                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2361                let (pending, recovered, resolver) = registrations
2362                    .remove(&validator)
2363                    .expect("validator should be registered");
2364                if idx_scheme == 0 {
2365                    let cfg = mocks::invalid::Config {
2366                        supervisor,
2367                        namespace: namespace.clone(),
2368                    };
2369
2370                    let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2371                        mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2372                    engine.start(pending);
2373                } else {
2374                    supervisors.push(supervisor.clone());
2375                    let application_cfg = mocks::application::Config {
2376                        hasher: Sha256::default(),
2377                        relay: relay.clone(),
2378                        participant: validator.clone(),
2379                        propose_latency: (10.0, 5.0),
2380                        verify_latency: (10.0, 5.0),
2381                    };
2382                    let (actor, application) = mocks::application::Application::new(
2383                        context.with_label("application"),
2384                        application_cfg,
2385                    );
2386                    actor.start();
2387                    let blocker = oracle.control(scheme.public_key());
2388                    let cfg = config::Config {
2389                        crypto: scheme,
2390                        blocker,
2391                        automaton: application.clone(),
2392                        relay: application.clone(),
2393                        reporter: supervisor.clone(),
2394                        supervisor,
2395                        partition: validator.to_string(),
2396                        compression: Some(3),
2397                        mailbox_size: 1024,
2398                        namespace: namespace.clone(),
2399                        leader_timeout: Duration::from_secs(1),
2400                        notarization_timeout: Duration::from_secs(2),
2401                        nullify_retry: Duration::from_secs(10),
2402                        fetch_timeout: Duration::from_secs(1),
2403                        activity_timeout,
2404                        skip_timeout,
2405                        max_fetch_count: 1,
2406                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2407                        fetch_concurrent: 1,
2408                        replay_buffer: NZUsize!(1024 * 1024),
2409                        write_buffer: NZUsize!(1024 * 1024),
2410                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2411                    };
2412                    let engine = Engine::new(context.with_label("engine"), cfg);
2413                    engine.start(pending, recovered, resolver);
2414                }
2415            }
2416
2417            // Wait for all engines to finish
2418            let mut finalizers = Vec::new();
2419            for supervisor in supervisors.iter_mut() {
2420                let (mut latest, mut monitor) = supervisor.subscribe().await;
2421                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2422                    while latest < required_containers {
2423                        latest = monitor.next().await.expect("event missing");
2424                    }
2425                }));
2426            }
2427            join_all(finalizers).await;
2428
2429            // Check supervisors for correct activity
2430            let mut invalid_count = 0;
2431            let byz = &validators[0];
2432            for supervisor in supervisors.iter() {
2433                // Ensure no faults
2434                {
2435                    let faults = supervisor.faults.lock().unwrap();
2436                    assert!(faults.is_empty());
2437                }
2438
2439                // Count invalid signatures
2440                {
2441                    let invalid = supervisor.invalid.lock().unwrap();
2442                    if *invalid > 0 {
2443                        invalid_count += 1;
2444                    }
2445                }
2446            }
2447            assert_eq!(invalid_count, n - 1);
2448
2449            // Ensure invalid is blocked
2450            let blocked = oracle.blocked().await.unwrap();
2451            assert!(!blocked.is_empty());
2452            for (a, b) in blocked {
2453                assert_ne!(&a, byz);
2454                assert_eq!(&b, byz);
2455            }
2456        });
2457    }
2458
2459    #[test_traced]
2460    #[ignore]
2461    fn test_invalid() {
2462        for seed in 0..5 {
2463            invalid::<MinPk>(seed);
2464            invalid::<MinSig>(seed);
2465        }
2466    }
2467
2468    fn impersonator<V: Variant>(seed: u64) {
2469        // Create context
2470        let n = 4;
2471        let threshold = quorum(n);
2472        let required_containers = 50;
2473        let activity_timeout = 10;
2474        let skip_timeout = 5;
2475        let namespace = b"consensus".to_vec();
2476        let cfg = deterministic::Config::new()
2477            .with_seed(seed)
2478            .with_timeout(Some(Duration::from_secs(30)));
2479        let executor = deterministic::Runner::new(cfg);
2480        executor.start(|mut context| async move {
2481            // Create simulated network
2482            let (network, mut oracle) = Network::new(
2483                context.with_label("network"),
2484                Config {
2485                    max_size: 1024 * 1024,
2486                },
2487            );
2488
2489            // Start network
2490            network.start();
2491
2492            // Register participants
2493            let mut schemes = Vec::new();
2494            let mut validators = Vec::new();
2495            for i in 0..n {
2496                let scheme = PrivateKey::from_seed(i as u64);
2497                let pk = scheme.public_key();
2498                schemes.push(scheme);
2499                validators.push(pk);
2500            }
2501            validators.sort();
2502            schemes.sort_by_key(|s| s.public_key());
2503            let mut registrations = register_validators(&mut oracle, &validators).await;
2504
2505            // Link all validators
2506            let link = Link {
2507                latency: 10.0,
2508                jitter: 1.0,
2509                success_rate: 1.0,
2510            };
2511            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2512
2513            // Derive threshold
2514            let (polynomial, shares) =
2515                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2516
2517            // Create engines
2518            let relay = Arc::new(mocks::relay::Relay::new());
2519            let mut supervisors = Vec::new();
2520            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2521                // Create scheme context
2522                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2523
2524                // Start engine
2525                let validator = scheme.public_key();
2526                let mut participants = BTreeMap::new();
2527                participants.insert(
2528                    0,
2529                    (
2530                        polynomial.clone(),
2531                        validators.clone(),
2532                        shares[idx_scheme].clone(),
2533                    ),
2534                );
2535                let supervisor_config = mocks::supervisor::Config::<_, V> {
2536                    namespace: namespace.clone(),
2537                    participants,
2538                };
2539                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2540                let (pending, recovered, resolver) = registrations
2541                    .remove(&validator)
2542                    .expect("validator should be registered");
2543                if idx_scheme == 0 {
2544                    let cfg = mocks::impersonator::Config {
2545                        supervisor,
2546                        namespace: namespace.clone(),
2547                    };
2548
2549                    let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2550                        mocks::impersonator::Impersonator::new(
2551                            context.with_label("byzantine_engine"),
2552                            cfg,
2553                        );
2554                    engine.start(pending);
2555                } else {
2556                    supervisors.push(supervisor.clone());
2557                    let application_cfg = mocks::application::Config {
2558                        hasher: Sha256::default(),
2559                        relay: relay.clone(),
2560                        participant: validator.clone(),
2561                        propose_latency: (10.0, 5.0),
2562                        verify_latency: (10.0, 5.0),
2563                    };
2564                    let (actor, application) = mocks::application::Application::new(
2565                        context.with_label("application"),
2566                        application_cfg,
2567                    );
2568                    actor.start();
2569                    let blocker = oracle.control(scheme.public_key());
2570                    let cfg = config::Config {
2571                        crypto: scheme,
2572                        blocker,
2573                        automaton: application.clone(),
2574                        relay: application.clone(),
2575                        reporter: supervisor.clone(),
2576                        supervisor,
2577                        partition: validator.to_string(),
2578                        compression: Some(3),
2579                        mailbox_size: 1024,
2580                        namespace: namespace.clone(),
2581                        leader_timeout: Duration::from_secs(1),
2582                        notarization_timeout: Duration::from_secs(2),
2583                        nullify_retry: Duration::from_secs(10),
2584                        fetch_timeout: Duration::from_secs(1),
2585                        activity_timeout,
2586                        skip_timeout,
2587                        max_fetch_count: 1,
2588                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2589                        fetch_concurrent: 1,
2590                        replay_buffer: NZUsize!(1024 * 1024),
2591                        write_buffer: NZUsize!(1024 * 1024),
2592                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2593                    };
2594                    let engine = Engine::new(context.with_label("engine"), cfg);
2595                    engine.start(pending, recovered, resolver);
2596                }
2597            }
2598
2599            // Wait for all engines to finish
2600            let mut finalizers = Vec::new();
2601            for supervisor in supervisors.iter_mut() {
2602                let (mut latest, mut monitor) = supervisor.subscribe().await;
2603                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2604                    while latest < required_containers {
2605                        latest = monitor.next().await.expect("event missing");
2606                    }
2607                }));
2608            }
2609            join_all(finalizers).await;
2610
2611            // Check supervisors for correct activity
2612            let byz = &validators[0];
2613            for supervisor in supervisors.iter() {
2614                // Ensure no faults
2615                {
2616                    let faults = supervisor.faults.lock().unwrap();
2617                    assert!(faults.is_empty());
2618                }
2619
2620                // Ensure no invalid signatures
2621                {
2622                    let invalid = supervisor.invalid.lock().unwrap();
2623                    assert_eq!(*invalid, 0);
2624                }
2625            }
2626
2627            // Ensure invalid is blocked
2628            let blocked = oracle.blocked().await.unwrap();
2629            assert!(!blocked.is_empty());
2630            for (a, b) in blocked {
2631                assert_ne!(&a, byz);
2632                assert_eq!(&b, byz);
2633            }
2634        });
2635    }
2636
2637    #[test_traced]
2638    #[ignore]
2639    fn test_impersonator() {
2640        for seed in 0..5 {
2641            impersonator::<MinPk>(seed);
2642            impersonator::<MinSig>(seed);
2643        }
2644    }
2645
2646    fn nuller<V: Variant>(seed: u64) {
2647        // Create context
2648        let n = 4;
2649        let threshold = quorum(n);
2650        let required_containers = 50;
2651        let activity_timeout = 10;
2652        let skip_timeout = 5;
2653        let namespace = b"consensus".to_vec();
2654        let cfg = deterministic::Config::new()
2655            .with_seed(seed)
2656            .with_timeout(Some(Duration::from_secs(30)));
2657        let executor = deterministic::Runner::new(cfg);
2658        executor.start(|mut context| async move {
2659            // Create simulated network
2660            let (network, mut oracle) = Network::new(
2661                context.with_label("network"),
2662                Config {
2663                    max_size: 1024 * 1024,
2664                },
2665            );
2666
2667            // Start network
2668            network.start();
2669
2670            // Register participants
2671            let mut schemes = Vec::new();
2672            let mut validators = Vec::new();
2673            for i in 0..n {
2674                let scheme = PrivateKey::from_seed(i as u64);
2675                let pk = scheme.public_key();
2676                schemes.push(scheme);
2677                validators.push(pk);
2678            }
2679            validators.sort();
2680            schemes.sort_by_key(|s| s.public_key());
2681            let mut registrations = register_validators(&mut oracle, &validators).await;
2682
2683            // Link all validators
2684            let link = Link {
2685                latency: 10.0,
2686                jitter: 1.0,
2687                success_rate: 1.0,
2688            };
2689            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2690
2691            // Derive threshold
2692            let (polynomial, shares) =
2693                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2694
2695            // Create engines
2696            let relay = Arc::new(mocks::relay::Relay::new());
2697            let mut supervisors = Vec::new();
2698            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2699                // Create scheme context
2700                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2701
2702                // Start engine
2703                let validator = scheme.public_key();
2704                let mut participants = BTreeMap::new();
2705                participants.insert(
2706                    0,
2707                    (
2708                        polynomial.clone(),
2709                        validators.clone(),
2710                        shares[idx_scheme].clone(),
2711                    ),
2712                );
2713                let supervisor_config = mocks::supervisor::Config::<_, V> {
2714                    namespace: namespace.clone(),
2715                    participants,
2716                };
2717                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2718                let (pending, recovered, resolver) = registrations
2719                    .remove(&validator)
2720                    .expect("validator should be registered");
2721                if idx_scheme == 0 {
2722                    let cfg = mocks::nuller::Config {
2723                        supervisor,
2724                        namespace: namespace.clone(),
2725                    };
2726                    let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2727                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2728                    engine.start(pending);
2729                } else {
2730                    supervisors.push(supervisor.clone());
2731                    let application_cfg = mocks::application::Config {
2732                        hasher: Sha256::default(),
2733                        relay: relay.clone(),
2734                        participant: validator.clone(),
2735                        propose_latency: (10.0, 5.0),
2736                        verify_latency: (10.0, 5.0),
2737                    };
2738                    let (actor, application) = mocks::application::Application::new(
2739                        context.with_label("application"),
2740                        application_cfg,
2741                    );
2742                    actor.start();
2743                    let blocker = oracle.control(scheme.public_key());
2744                    let cfg = config::Config {
2745                        crypto: scheme,
2746                        blocker,
2747                        automaton: application.clone(),
2748                        relay: application.clone(),
2749                        reporter: supervisor.clone(),
2750                        supervisor,
2751                        partition: validator.to_string(),
2752                        compression: Some(3),
2753                        mailbox_size: 1024,
2754                        namespace: namespace.clone(),
2755                        leader_timeout: Duration::from_secs(1),
2756                        notarization_timeout: Duration::from_secs(2),
2757                        nullify_retry: Duration::from_secs(10),
2758                        fetch_timeout: Duration::from_secs(1),
2759                        activity_timeout,
2760                        skip_timeout,
2761                        max_fetch_count: 1,
2762                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2763                        fetch_concurrent: 1,
2764                        replay_buffer: NZUsize!(1024 * 1024),
2765                        write_buffer: NZUsize!(1024 * 1024),
2766                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2767                    };
2768                    let engine = Engine::new(context.with_label("engine"), cfg);
2769                    engine.start(pending, recovered, resolver);
2770                }
2771            }
2772
2773            // Wait for all engines to finish
2774            let mut finalizers = Vec::new();
2775            for supervisor in supervisors.iter_mut() {
2776                let (mut latest, mut monitor) = supervisor.subscribe().await;
2777                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2778                    while latest < required_containers {
2779                        latest = monitor.next().await.expect("event missing");
2780                    }
2781                }));
2782            }
2783            join_all(finalizers).await;
2784
2785            // Check supervisors for correct activity
2786            let byz = &validators[0];
2787            let mut count_nullify_and_finalize = 0;
2788            for supervisor in supervisors.iter() {
2789                // Ensure only faults for byz
2790                {
2791                    let faults = supervisor.faults.lock().unwrap();
2792                    assert_eq!(faults.len(), 1);
2793                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2794                    for (_, faults) in faulter.iter() {
2795                        for fault in faults.iter() {
2796                            match fault {
2797                                Activity::NullifyFinalize(_) => {
2798                                    count_nullify_and_finalize += 1;
2799                                }
2800                                _ => panic!("unexpected fault: {fault:?}"),
2801                            }
2802                        }
2803                    }
2804                }
2805
2806                // Ensure no invalid signatures
2807                {
2808                    let invalid = supervisor.invalid.lock().unwrap();
2809                    assert_eq!(*invalid, 0);
2810                }
2811            }
2812            assert!(count_nullify_and_finalize > 0);
2813
2814            // Ensure nullifier is blocked
2815            let blocked = oracle.blocked().await.unwrap();
2816            assert!(!blocked.is_empty());
2817            for (a, b) in blocked {
2818                assert_ne!(&a, byz);
2819                assert_eq!(&b, byz);
2820            }
2821        });
2822    }
2823
2824    #[test_traced]
2825    #[ignore]
2826    fn test_nuller() {
2827        for seed in 0..5 {
2828            nuller::<MinPk>(seed);
2829            nuller::<MinSig>(seed);
2830        }
2831    }
2832
2833    fn outdated<V: Variant>(seed: u64) {
2834        // Create context
2835        let n = 4;
2836        let threshold = quorum(n);
2837        let required_containers = 100;
2838        let activity_timeout = 10;
2839        let skip_timeout = 5;
2840        let namespace = b"consensus".to_vec();
2841        let cfg = deterministic::Config::new()
2842            .with_seed(seed)
2843            .with_timeout(Some(Duration::from_secs(30)));
2844        let executor = deterministic::Runner::new(cfg);
2845        executor.start(|mut context| async move {
2846            // Create simulated network
2847            let (network, mut oracle) = Network::new(
2848                context.with_label("network"),
2849                Config {
2850                    max_size: 1024 * 1024,
2851                },
2852            );
2853
2854            // Start network
2855            network.start();
2856
2857            // Register participants
2858            let mut schemes = Vec::new();
2859            let mut validators = Vec::new();
2860            for i in 0..n {
2861                let scheme = PrivateKey::from_seed(i as u64);
2862                let pk = scheme.public_key();
2863                schemes.push(scheme);
2864                validators.push(pk);
2865            }
2866            validators.sort();
2867            schemes.sort_by_key(|s| s.public_key());
2868            let mut registrations = register_validators(&mut oracle, &validators).await;
2869
2870            // Link all validators
2871            let link = Link {
2872                latency: 10.0,
2873                jitter: 1.0,
2874                success_rate: 1.0,
2875            };
2876            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2877
2878            // Derive threshold
2879            let (polynomial, shares) =
2880                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2881
2882            // Create engines
2883            let relay = Arc::new(mocks::relay::Relay::new());
2884            let mut supervisors = Vec::new();
2885            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2886                // Create scheme context
2887                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2888
2889                // Start engine
2890                let validator = scheme.public_key();
2891                let mut participants = BTreeMap::new();
2892                participants.insert(
2893                    0,
2894                    (
2895                        polynomial.clone(),
2896                        validators.clone(),
2897                        shares[idx_scheme].clone(),
2898                    ),
2899                );
2900                let supervisor_config = mocks::supervisor::Config::<_, V> {
2901                    namespace: namespace.clone(),
2902                    participants,
2903                };
2904                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2905                let (pending, recovered, resolver) = registrations
2906                    .remove(&validator)
2907                    .expect("validator should be registered");
2908                if idx_scheme == 0 {
2909                    let cfg = mocks::outdated::Config {
2910                        supervisor,
2911                        namespace: namespace.clone(),
2912                        view_delta: activity_timeout * 4,
2913                    };
2914                    let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
2915                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2916                    engine.start(pending);
2917                } else {
2918                    supervisors.push(supervisor.clone());
2919                    let application_cfg = mocks::application::Config {
2920                        hasher: Sha256::default(),
2921                        relay: relay.clone(),
2922                        participant: validator.clone(),
2923                        propose_latency: (10.0, 5.0),
2924                        verify_latency: (10.0, 5.0),
2925                    };
2926                    let (actor, application) = mocks::application::Application::new(
2927                        context.with_label("application"),
2928                        application_cfg,
2929                    );
2930                    actor.start();
2931                    let blocker = oracle.control(scheme.public_key());
2932                    let cfg = config::Config {
2933                        crypto: scheme,
2934                        blocker,
2935                        automaton: application.clone(),
2936                        relay: application.clone(),
2937                        reporter: supervisor.clone(),
2938                        supervisor,
2939                        partition: validator.to_string(),
2940                        compression: Some(3),
2941                        mailbox_size: 1024,
2942                        namespace: namespace.clone(),
2943                        leader_timeout: Duration::from_secs(1),
2944                        notarization_timeout: Duration::from_secs(2),
2945                        nullify_retry: Duration::from_secs(10),
2946                        fetch_timeout: Duration::from_secs(1),
2947                        activity_timeout,
2948                        skip_timeout,
2949                        max_fetch_count: 1,
2950                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2951                        fetch_concurrent: 1,
2952                        replay_buffer: NZUsize!(1024 * 1024),
2953                        write_buffer: NZUsize!(1024 * 1024),
2954                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2955                    };
2956                    let engine = Engine::new(context.with_label("engine"), cfg);
2957                    engine.start(pending, recovered, resolver);
2958                }
2959            }
2960
2961            // Wait for all engines to finish
2962            let mut finalizers = Vec::new();
2963            for supervisor in supervisors.iter_mut() {
2964                let (mut latest, mut monitor) = supervisor.subscribe().await;
2965                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2966                    while latest < required_containers {
2967                        latest = monitor.next().await.expect("event missing");
2968                    }
2969                }));
2970            }
2971            join_all(finalizers).await;
2972
2973            // Check supervisors for correct activity
2974            for supervisor in supervisors.iter() {
2975                // Ensure no faults
2976                {
2977                    let faults = supervisor.faults.lock().unwrap();
2978                    assert!(faults.is_empty());
2979                }
2980
2981                // Ensure no invalid signatures
2982                {
2983                    let invalid = supervisor.invalid.lock().unwrap();
2984                    assert_eq!(*invalid, 0);
2985                }
2986            }
2987
2988            // Ensure no blocked connections
2989            let blocked = oracle.blocked().await.unwrap();
2990            assert!(blocked.is_empty());
2991        });
2992    }
2993
2994    #[test_traced]
2995    #[ignore]
2996    fn test_outdated() {
2997        for seed in 0..5 {
2998            outdated::<MinPk>(seed);
2999            outdated::<MinSig>(seed);
3000        }
3001    }
3002
3003    fn run_1k<V: Variant>() {
3004        // Create context
3005        let n = 10;
3006        let threshold = quorum(n);
3007        let required_containers = 1_000;
3008        let activity_timeout = 10;
3009        let skip_timeout = 5;
3010        let namespace = b"consensus".to_vec();
3011        let cfg = deterministic::Config::new();
3012        let executor = deterministic::Runner::new(cfg);
3013        executor.start(|mut context| async move {
3014            // Create simulated network
3015            let (network, mut oracle) = Network::new(
3016                context.with_label("network"),
3017                Config {
3018                    max_size: 1024 * 1024,
3019                },
3020            );
3021
3022            // Start network
3023            network.start();
3024
3025            // Register participants
3026            let mut schemes = Vec::new();
3027            let mut validators = Vec::new();
3028            for i in 0..n {
3029                let scheme = PrivateKey::from_seed(i as u64);
3030                let pk = scheme.public_key();
3031                schemes.push(scheme);
3032                validators.push(pk);
3033            }
3034            validators.sort();
3035            schemes.sort_by_key(|s| s.public_key());
3036            let mut registrations = register_validators(&mut oracle, &validators).await;
3037
3038            // Link all validators
3039            let link = Link {
3040                latency: 80.0,
3041                jitter: 10.0,
3042                success_rate: 0.98,
3043            };
3044            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3045
3046            // Derive threshold
3047            let (polynomial, shares) =
3048                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3049
3050            // Create engines
3051            let relay = Arc::new(mocks::relay::Relay::new());
3052            let mut supervisors = Vec::new();
3053            let mut engine_handlers = Vec::new();
3054            for (idx, scheme) in schemes.into_iter().enumerate() {
3055                // Create scheme context
3056                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3057
3058                // Configure engine
3059                let validator = scheme.public_key();
3060                let mut participants = BTreeMap::new();
3061                participants.insert(
3062                    0,
3063                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
3064                );
3065                let supervisor_config = mocks::supervisor::Config::<_, V> {
3066                    namespace: namespace.clone(),
3067                    participants,
3068                };
3069                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3070                supervisors.push(supervisor.clone());
3071                let application_cfg = mocks::application::Config {
3072                    hasher: Sha256::default(),
3073                    relay: relay.clone(),
3074                    participant: validator.clone(),
3075                    propose_latency: (100.0, 50.0),
3076                    verify_latency: (50.0, 40.0),
3077                };
3078                let (actor, application) = mocks::application::Application::new(
3079                    context.with_label("application"),
3080                    application_cfg,
3081                );
3082                actor.start();
3083                let blocker = oracle.control(scheme.public_key());
3084                let cfg = config::Config {
3085                    crypto: scheme,
3086                    blocker,
3087                    automaton: application.clone(),
3088                    relay: application.clone(),
3089                    reporter: supervisor.clone(),
3090                    supervisor,
3091                    partition: validator.to_string(),
3092                    compression: Some(3),
3093                    mailbox_size: 1024,
3094                    namespace: namespace.clone(),
3095                    leader_timeout: Duration::from_secs(1),
3096                    notarization_timeout: Duration::from_secs(2),
3097                    nullify_retry: Duration::from_secs(10),
3098                    fetch_timeout: Duration::from_secs(1),
3099                    activity_timeout,
3100                    skip_timeout,
3101                    max_fetch_count: 1,
3102                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3103                    fetch_concurrent: 1,
3104                    replay_buffer: NZUsize!(1024 * 1024),
3105                    write_buffer: NZUsize!(1024 * 1024),
3106                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3107                };
3108                let engine = Engine::new(context.with_label("engine"), cfg);
3109
3110                // Start engine
3111                let (pending, recovered, resolver) = registrations
3112                    .remove(&validator)
3113                    .expect("validator should be registered");
3114                engine_handlers.push(engine.start(pending, recovered, resolver));
3115            }
3116
3117            // Wait for all engines to finish
3118            let mut finalizers = Vec::new();
3119            for supervisor in supervisors.iter_mut() {
3120                let (mut latest, mut monitor) = supervisor.subscribe().await;
3121                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3122                    while latest < required_containers {
3123                        latest = monitor.next().await.expect("event missing");
3124                    }
3125                }));
3126            }
3127            join_all(finalizers).await;
3128
3129            // Check supervisors for correct activity
3130            for supervisor in supervisors.iter() {
3131                // Ensure no faults
3132                {
3133                    let faults = supervisor.faults.lock().unwrap();
3134                    assert!(faults.is_empty());
3135                }
3136
3137                // Ensure no invalid signatures
3138                {
3139                    let invalid = supervisor.invalid.lock().unwrap();
3140                    assert_eq!(*invalid, 0);
3141                }
3142            }
3143
3144            // Ensure no blocked connections
3145            let blocked = oracle.blocked().await.unwrap();
3146            assert!(blocked.is_empty());
3147        })
3148    }
3149
3150    #[test_traced]
3151    #[ignore]
3152    fn test_1k() {
3153        run_1k::<MinPk>();
3154        run_1k::<MinSig>();
3155    }
3156
3157    fn tle<V: Variant>() {
3158        // Create context
3159        let n = 4;
3160        let threshold = quorum(n);
3161        let namespace = b"consensus".to_vec();
3162        let activity_timeout = 100;
3163        let skip_timeout = 50;
3164        let executor = deterministic::Runner::timed(Duration::from_secs(30));
3165        executor.start(|mut context| async move {
3166            // Create simulated network
3167            let (network, mut oracle) = Network::new(
3168                context.with_label("network"),
3169                Config {
3170                    max_size: 1024 * 1024,
3171                },
3172            );
3173
3174            // Start network
3175            network.start();
3176
3177            // Register participants
3178            let mut schemes = Vec::new();
3179            let mut validators = Vec::new();
3180            for i in 0..n {
3181                let scheme = PrivateKey::from_seed(i as u64);
3182                let pk = scheme.public_key();
3183                schemes.push(scheme);
3184                validators.push(pk);
3185            }
3186            validators.sort();
3187            schemes.sort_by_key(|s| s.public_key());
3188            let mut registrations = register_validators(&mut oracle, &validators).await;
3189
3190            // Link all validators
3191            let link = Link {
3192                latency: 10.0,
3193                jitter: 5.0,
3194                success_rate: 1.0,
3195            };
3196            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3197
3198            // Derive threshold
3199            let (polynomial, shares) =
3200                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3201            let public_key = *public::<V>(&polynomial);
3202
3203            // Create engines and supervisors
3204            let relay = Arc::new(mocks::relay::Relay::new());
3205            let mut supervisors = Vec::new();
3206            let mut engine_handlers = Vec::new();
3207            let monitor_supervisor = Arc::new(Mutex::new(None));
3208            for (idx, scheme) in schemes.into_iter().enumerate() {
3209                // Create scheme context
3210                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3211
3212                // Configure engine
3213                let validator = scheme.public_key();
3214                let mut participants = BTreeMap::new();
3215                participants.insert(
3216                    0,
3217                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
3218                );
3219
3220                // Store first supervisor for monitoring
3221                let supervisor_config = mocks::supervisor::Config::<_, V> {
3222                    namespace: namespace.clone(),
3223                    participants,
3224                };
3225                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3226                supervisors.push(supervisor.clone());
3227                if idx == 0 {
3228                    *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3229                }
3230
3231                // Configure application
3232                let application_cfg = mocks::application::Config {
3233                    hasher: Sha256::default(),
3234                    relay: relay.clone(),
3235                    participant: validator.clone(),
3236                    propose_latency: (10.0, 5.0),
3237                    verify_latency: (10.0, 5.0),
3238                };
3239                let (actor, application) = mocks::application::Application::new(
3240                    context.with_label("application"),
3241                    application_cfg,
3242                );
3243                actor.start();
3244                let blocker = oracle.control(scheme.public_key());
3245                let cfg = config::Config {
3246                    crypto: scheme,
3247                    blocker,
3248                    automaton: application.clone(),
3249                    relay: application.clone(),
3250                    reporter: supervisor.clone(),
3251                    supervisor,
3252                    partition: validator.to_string(),
3253                    compression: Some(3),
3254                    mailbox_size: 1024,
3255                    namespace: namespace.clone(),
3256                    leader_timeout: Duration::from_millis(100),
3257                    notarization_timeout: Duration::from_millis(200),
3258                    nullify_retry: Duration::from_millis(500),
3259                    fetch_timeout: Duration::from_millis(100),
3260                    activity_timeout,
3261                    skip_timeout,
3262                    max_fetch_count: 1,
3263                    fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3264                    fetch_concurrent: 1,
3265                    replay_buffer: NZUsize!(1024 * 1024),
3266                    write_buffer: NZUsize!(1024 * 1024),
3267                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3268                };
3269                let engine = Engine::new(context.with_label("engine"), cfg);
3270
3271                // Start engine
3272                let (pending, recovered, resolver) = registrations
3273                    .remove(&validator)
3274                    .expect("validator should be registered");
3275                engine_handlers.push(engine.start(pending, recovered, resolver));
3276            }
3277
3278            // Prepare TLE test data
3279            let target = 10u64; // Encrypt for view 10
3280            let target_bytes = target.to_be_bytes();
3281            let message_content = b"Secret message for future view10"; // 32 bytes
3282            let message = Block::new(*message_content);
3283
3284            // Encrypt message for future view using threshold public key
3285            let seed_namespace = seed_namespace(&namespace);
3286            let ciphertext = encrypt::<_, V>(
3287                &mut context,
3288                public_key,
3289                (Some(&seed_namespace), &target_bytes),
3290                &message,
3291            );
3292
3293            // Wait for consensus to reach the target view and then decrypt
3294            let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3295            loop {
3296                // Wait for notarization
3297                context.sleep(Duration::from_millis(100)).await;
3298                let notarizations = supervisor.notarizations.lock().unwrap();
3299                let Some(notarization) = notarizations.get(&target) else {
3300                    continue;
3301                };
3302
3303                // Decrypt the message using the seed signature
3304                let seed_signature = notarization.seed_signature;
3305                let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3306                    .expect("Decryption should succeed with valid seed signature");
3307                assert_eq!(
3308                    message.as_ref(),
3309                    decrypted.as_ref(),
3310                    "Decrypted message should match original message"
3311                );
3312                break;
3313            }
3314        });
3315    }
3316
3317    #[test_traced]
3318    fn test_tle() {
3319        tle::<MinPk>();
3320        tle::<MinSig>();
3321    }
3322}