commonware_consensus/threshold_simplex/
mod.rs

1//! [Simplex](crate::simplex)-like BFT agreement with an embedded VRF and succinct consensus certificates.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `threshold-simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal finalization
5//! latency in a partially synchronous setting. Unlike Simplex Consensus, however, `threshold-simplex` employs threshold
6//! cryptography (specifically BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum) to generate both
7//! a bias-resistant beacon (for leader election and post-facto execution randomness) and succinct consensus certificates
8//! (any certificate can be verified with just the static public key of the consensus instance) for each view
9//! with zero message overhead (natively integrated).
10//!
11//! _If you wish to deploy Simplex Consensus but can't employ threshold signatures, see
12//! [crate::simplex]._
13//!
14//! # Features
15//!
16//! * Wicked Fast Block Times (2 Network Hops)
17//! * Optimal Finalization Latency (3 Network Hops)
18//! * Externalized Uptime and Fault Proofs
19//! * Decoupled Block Broadcast and Sync
20//! * Lazy Message Verification
21//! * Flexible Block Format
22//! * Embedded VRF for Leader Election and Post-Facto Execution Randomness
23//! * Succinct Consensus Certificates for Notarization, Nullification, and Finality
24//!
25//! # Design
26//!
27//! ## Architecture
28//!
29//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
30//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
31//! is responsible for directing participation in the current view. Lastly, the `Resolver` is responsible for
32//! fetching artifacts from previous views required to verify proposed blocks in the latest view.
33//!
34//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
35//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
36//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
37//!
38//! ```txt
39//!                            +------------+          +++++++++++++++
40//!                            |            +--------->+             +
41//!                            |  Batcher   |          +    Peers    +
42//!                            |            |<---------+             +
43//!                            +-------+----+          +++++++++++++++
44//!                                |   ^
45//!                                |   |
46//!                                |   |
47//!                                |   |
48//!                                v   |
49//! +---------------+           +---------+            +++++++++++++++
50//! |               |<----------+         +----------->+             +
51//! |  Application  |           |  Voter  |            +    Peers    +
52//! |               +---------->|         |<-----------+             +
53//! +---------------+           +--+------+            +++++++++++++++
54//!                                |   ^
55//!                                |   |
56//!                                |   |
57//!                                |   |
58//!                                v   |
59//!                            +-------+----+          +++++++++++++++
60//!                            |            +--------->+             +
61//!                            |  Resolver  |          +    Peers    +
62//!                            |            |<---------+             +
63//!                            +------------+          +++++++++++++++
64//! ```
65//!
66//! ## Joining Consensus
67//!
68//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
69//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
70//! latest view and begin participating in consensus (assuming it can verify blocks).
71//!
72//! ## Persistence
73//!
74//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
75//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
76//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
77//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
78//! on restart (especially in the case of unclean shutdown).
79//!
80//! ## Batched Verification
81//!
82//! Unlike other consensus constructions that verify all incoming messages received from peers,
83//! `threshold-simplex` lazily verifies messages (only when a quorum is met). If an invalid signature
84//! is detected, the `Batcher` will perform repeated bisections over collected messages to find the
85//! offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
86//!
87//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
88//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
89//!
90//! ## Protocol Description
91//!
92//! ### Specification for View `v`
93//!
94//! Upon entering view `v`:
95//! * Determine leader `l` for view `v`
96//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
97//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
98//! * If leader `l`, broadcast `(part(v), notarize(c,v))`
99//!   * If can't propose container in view `v` because missing notarization/nullification for a
100//!     previous view `v_m`, request `v_m`
101//!
102//! Upon receiving first `(part(v), notarize(c,v))` from `l`:
103//! * Cancel `t_l`
104//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
105//!   between `v` and `v_parent`, verify `c` and broadcast `(part(v), notarize(c,v))`
106//!
107//! Upon receiving `2f+1` `(part(v), notarize(c,v))`:
108//! * Cancel `t_a`
109//! * Mark `c` as notarized
110//! * Broadcast `(seed(v), notarization(c,v))` (even if we have not verified `c`)
111//! * If have not broadcast `(part(v), nullify(v))`, broadcast `finalize(c,v)`
112//! * Enter `v+1`
113//!
114//! Upon receiving `2f+1` `(part(v), nullify(v))`:
115//! * Broadcast `(seed(v), nullification(v))`
116//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
117//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
118//!       instead.
119//! * Enter `v+1`
120//!
121//! Upon receiving `2f+1` `finalize(c,v)`:
122//! * Mark `c` as finalized (and recursively finalize its parents)
123//! * Broadcast `(seed(v), finalization(c,v))` (even if we have not verified `c`)
124//!
125//! Upon `t_l` or `t_a` firing:
126//! * Broadcast `(part(v), nullify(v))`
127//! * Every `t_r` after `(part(v), nullify(v))` broadcast that we are still in view `v`:
128//!    * Rebroadcast `(part(v), nullify(v))` and either `(seed(v-1), notarization(v-1))` or `(seed(v-1), nullification(v-1))`
129//!
130//! #### Embedded VRF
131//!
132//! When broadcasting any `notarize(c,v)` or `nullify(v)` message, a participant must also include a `part(v)` message (a partial
133//! signature over the view `v`). After `2f+1` `notarize(c,v)` or `nullify(v)` messages are collected from unique participants,
134//! `seed(v)` can be recovered. Because `part(v)` is only over the view `v`, the seed derived for a given view `v` is the same regardless of
135//! whether or not a block was notarized in said view `v`.
136//!
137//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
138//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
139//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
140//! is used as a seed in `v`).
141//!
142//! #### Succinct Consensus Certificates
143//!
144//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain partial signatures for a static
145//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
146//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
147//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
148//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
149//! to operate a lite client).
150//!
151//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
152//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
153//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
154//!
155//! ### Deviations from Simplex Consensus
156//!
157//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
158//!   a set of all notarizations/nullifications for all historical blocks.
159//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
160//!   either a "block" or a "dummy block", respectively.
161//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
162//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
163//!   some number of views (again to trigger early view transition for an unresponsive leader).
164//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
165//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
166
167use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172    if #[cfg(not(target_arch = "wasm32"))] {
173        mod actors;
174        mod config;
175        pub use config::Config;
176        mod engine;
177        pub use engine::Engine;
178        mod metrics;
179    }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185/// The minimum view we are tracking both in-memory and on-disk.
186pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187    last_finalized.saturating_sub(activity_timeout)
188}
189
190/// Whether or not a view is interesting to us. This is a function
191/// of both `min_active` and whether or not the view is too far
192/// in the future (based on the view we are currently in).
193pub(crate) fn interesting(
194    activity_timeout: View,
195    last_finalized: View,
196    current: View,
197    pending: View,
198    allow_future: bool,
199) -> bool {
200    if pending < min_active(activity_timeout, last_finalized) {
201        return false;
202    }
203    if !allow_future && pending > current + 1 {
204        return false;
205    }
206    true
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::{threshold_simplex::types::seed_namespace, Monitor};
213    use commonware_cryptography::{
214        bls12381::{
215            dkg::ops,
216            primitives::{
217                poly::public,
218                variant::{MinPk, MinSig, Variant},
219            },
220            tle::{decrypt, encrypt, Block},
221        },
222        ed25519::PrivateKey,
223        PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224    };
225    use commonware_macros::{select, test_traced};
226    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
228    use commonware_utils::{quorum, NZU32};
229    use engine::Engine;
230    use futures::{future::join_all, StreamExt};
231    use governor::Quota;
232    use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233    use std::{
234        collections::{BTreeMap, HashMap},
235        sync::{Arc, Mutex},
236        time::Duration,
237    };
238    use tracing::{debug, warn};
239    use types::Activity;
240
241    /// Registers all validators using the oracle.
242    async fn register_validators<P: PublicKey>(
243        oracle: &mut Oracle<P>,
244        validators: &[P],
245    ) -> HashMap<
246        P,
247        (
248            (Sender<P>, Receiver<P>),
249            (Sender<P>, Receiver<P>),
250            (Sender<P>, Receiver<P>),
251        ),
252    > {
253        let mut registrations = HashMap::new();
254        for validator in validators.iter() {
255            let (pending_sender, pending_receiver) =
256                oracle.register(validator.clone(), 0).await.unwrap();
257            let (recovered_sender, recovered_receiver) =
258                oracle.register(validator.clone(), 1).await.unwrap();
259            let (resolver_sender, resolver_receiver) =
260                oracle.register(validator.clone(), 2).await.unwrap();
261            registrations.insert(
262                validator.clone(),
263                (
264                    (pending_sender, pending_receiver),
265                    (recovered_sender, recovered_receiver),
266                    (resolver_sender, resolver_receiver),
267                ),
268            );
269        }
270        registrations
271    }
272
273    /// Enum to describe the action to take when linking validators.
274    enum Action {
275        Link(Link),
276        Update(Link), // Unlink and then link
277        Unlink,
278    }
279
280    /// Links (or unlinks) validators using the oracle.
281    ///
282    /// The `action` parameter determines the action (e.g. link, unlink) to take.
283    /// The `restrict_to` function can be used to restrict the linking to certain connections,
284    /// otherwise all validators will be linked to all other validators.
285    async fn link_validators<P: PublicKey>(
286        oracle: &mut Oracle<P>,
287        validators: &[P],
288        action: Action,
289        restrict_to: Option<fn(usize, usize, usize) -> bool>,
290    ) {
291        for (i1, v1) in validators.iter().enumerate() {
292            for (i2, v2) in validators.iter().enumerate() {
293                // Ignore self
294                if v2 == v1 {
295                    continue;
296                }
297
298                // Restrict to certain connections
299                if let Some(f) = restrict_to {
300                    if !f(validators.len(), i1, i2) {
301                        continue;
302                    }
303                }
304
305                // Do any unlinking first
306                match action {
307                    Action::Update(_) | Action::Unlink => {
308                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
309                    }
310                    _ => {}
311                }
312
313                // Do any linking after
314                match action {
315                    Action::Link(ref link) | Action::Update(ref link) => {
316                        oracle
317                            .add_link(v1.clone(), v2.clone(), link.clone())
318                            .await
319                            .unwrap();
320                    }
321                    _ => {}
322                }
323            }
324        }
325    }
326
327    fn all_online<V: Variant>() {
328        // Create context
329        let n = 5;
330        let threshold = quorum(n);
331        let required_containers = 100;
332        let activity_timeout = 10;
333        let skip_timeout = 5;
334        let namespace = b"consensus".to_vec();
335        let executor = deterministic::Runner::timed(Duration::from_secs(30));
336        executor.start(|mut context| async move {
337            // Create simulated network
338            let (network, mut oracle) = Network::new(
339                context.with_label("network"),
340                Config {
341                    max_size: 1024 * 1024,
342                },
343            );
344
345            // Start network
346            network.start();
347
348            // Register participants
349            let mut schemes = Vec::new();
350            let mut validators = Vec::new();
351            for i in 0..n {
352                let scheme = PrivateKey::from_seed(i as u64);
353                let pk = scheme.public_key();
354                schemes.push(scheme);
355                validators.push(pk);
356            }
357            validators.sort();
358            schemes.sort_by_key(|s| s.public_key());
359            let mut registrations = register_validators(&mut oracle, &validators).await;
360
361            // Link all validators
362            let link = Link {
363                latency: 10.0,
364                jitter: 1.0,
365                success_rate: 1.0,
366            };
367            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
368
369            // Derive threshold
370            let (polynomial, shares) =
371                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
372
373            // Create engines
374            let relay = Arc::new(mocks::relay::Relay::new());
375            let mut supervisors = Vec::new();
376            let mut engine_handlers = Vec::new();
377            for (idx, scheme) in schemes.into_iter().enumerate() {
378                // Create scheme context
379                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
380
381                // Configure engine
382                let validator = scheme.public_key();
383                let mut participants = BTreeMap::new();
384                participants.insert(
385                    0,
386                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
387                );
388                let supervisor_config = mocks::supervisor::Config::<_, V> {
389                    namespace: namespace.clone(),
390                    participants,
391                };
392                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
393                supervisors.push(supervisor.clone());
394                let application_cfg = mocks::application::Config {
395                    hasher: Sha256::default(),
396                    relay: relay.clone(),
397                    participant: validator.clone(),
398                    propose_latency: (10.0, 5.0),
399                    verify_latency: (10.0, 5.0),
400                };
401                let (actor, application) = mocks::application::Application::new(
402                    context.with_label("application"),
403                    application_cfg,
404                );
405                actor.start();
406                let blocker = oracle.control(scheme.public_key());
407                let cfg = config::Config {
408                    crypto: scheme,
409                    blocker,
410                    automaton: application.clone(),
411                    relay: application.clone(),
412                    reporter: supervisor.clone(),
413                    supervisor,
414                    partition: validator.to_string(),
415                    compression: Some(3),
416                    mailbox_size: 1024,
417                    namespace: namespace.clone(),
418                    leader_timeout: Duration::from_secs(1),
419                    notarization_timeout: Duration::from_secs(2),
420                    nullify_retry: Duration::from_secs(10),
421                    fetch_timeout: Duration::from_secs(1),
422                    activity_timeout,
423                    skip_timeout,
424                    max_fetch_count: 1,
425                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
426                    fetch_concurrent: 1,
427                    replay_buffer: 1024 * 1024,
428                    write_buffer: 1024 * 1024,
429                };
430                let engine = Engine::new(context.with_label("engine"), cfg);
431
432                // Start engine
433                let (pending, recovered, resolver) = registrations
434                    .remove(&validator)
435                    .expect("validator should be registered");
436                engine_handlers.push(engine.start(pending, recovered, resolver));
437            }
438
439            // Wait for all engines to finish
440            let mut finalizers = Vec::new();
441            for supervisor in supervisors.iter_mut() {
442                let (mut latest, mut monitor) = supervisor.subscribe().await;
443                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
444                    while latest < required_containers {
445                        latest = monitor.next().await.expect("event missing");
446                    }
447                }));
448            }
449            join_all(finalizers).await;
450
451            // Check supervisors for correct activity
452            let latest_complete = required_containers - activity_timeout;
453            for supervisor in supervisors.iter() {
454                // Ensure no faults
455                {
456                    let faults = supervisor.faults.lock().unwrap();
457                    assert!(faults.is_empty());
458                }
459
460                // Ensure no invalid signatures
461                {
462                    let invalid = supervisor.invalid.lock().unwrap();
463                    assert_eq!(*invalid, 0);
464                }
465
466                // Ensure seeds for all views
467                {
468                    let seeds = supervisor.seeds.lock().unwrap();
469                    for view in 1..latest_complete {
470                        // Ensure seed for every view
471                        if !seeds.contains_key(&view) {
472                            panic!("view: {view}");
473                        }
474                    }
475                }
476
477                // Ensure no forks
478                let mut notarized = HashMap::new();
479                let mut finalized = HashMap::new();
480                {
481                    let notarizes = supervisor.notarizes.lock().unwrap();
482                    for view in 1..latest_complete {
483                        // Ensure only one payload proposed per view
484                        let Some(payloads) = notarizes.get(&view) else {
485                            continue;
486                        };
487                        if payloads.len() > 1 {
488                            panic!("view: {view}");
489                        }
490                        let (digest, notarizers) = payloads.iter().next().unwrap();
491                        notarized.insert(view, *digest);
492
493                        if notarizers.len() < threshold as usize {
494                            // We can't verify that everyone participated at every view because some nodes may
495                            // have started later.
496                            panic!("view: {view}");
497                        }
498                    }
499                }
500                {
501                    let notarizations = supervisor.notarizations.lock().unwrap();
502                    for view in 1..latest_complete {
503                        // Ensure notarization matches digest from notarizes
504                        let Some(notarization) = notarizations.get(&view) else {
505                            continue;
506                        };
507                        let Some(digest) = notarized.get(&view) else {
508                            continue;
509                        };
510                        assert_eq!(&notarization.proposal.payload, digest);
511                    }
512                }
513                {
514                    let finalizes = supervisor.finalizes.lock().unwrap();
515                    for view in 1..latest_complete {
516                        // Ensure only one payload proposed per view
517                        let Some(payloads) = finalizes.get(&view) else {
518                            continue;
519                        };
520                        if payloads.len() > 1 {
521                            panic!("view: {view}");
522                        }
523                        let (digest, finalizers) = payloads.iter().next().unwrap();
524                        finalized.insert(view, *digest);
525
526                        // Only check at views below timeout
527                        if view > latest_complete {
528                            continue;
529                        }
530
531                        // Ensure everyone participating
532                        if finalizers.len() < threshold as usize {
533                            // We can't verify that everyone participated at every view because some nodes may
534                            // have started later.
535                            panic!("view: {view}");
536                        }
537
538                        // Ensure no nullifies for any finalizers
539                        let nullifies = supervisor.nullifies.lock().unwrap();
540                        let Some(nullifies) = nullifies.get(&view) else {
541                            continue;
542                        };
543                        for (_, finalizers) in payloads.iter() {
544                            for finalizer in finalizers.iter() {
545                                if nullifies.contains(finalizer) {
546                                    panic!("should not nullify and finalize at same view");
547                                }
548                            }
549                        }
550                    }
551                }
552                {
553                    let finalizations = supervisor.finalizations.lock().unwrap();
554                    for view in 1..latest_complete {
555                        // Ensure finalization matches digest from finalizes
556                        let Some(finalization) = finalizations.get(&view) else {
557                            continue;
558                        };
559                        let Some(digest) = finalized.get(&view) else {
560                            continue;
561                        };
562                        assert_eq!(&finalization.proposal.payload, digest);
563                    }
564                }
565            }
566
567            // Ensure no blocked connections
568            let blocked = oracle.blocked().await.unwrap();
569            assert!(blocked.is_empty());
570        });
571    }
572
573    #[test_traced]
574    fn test_all_online() {
575        all_online::<MinPk>();
576        all_online::<MinSig>();
577    }
578
579    fn unclean_shutdown<V: Variant>() {
580        // Create context
581        let n = 5;
582        let threshold = quorum(n);
583        let required_containers = 100;
584        let activity_timeout = 10;
585        let skip_timeout = 5;
586        let namespace = b"consensus".to_vec();
587
588        // Derive threshold
589        let mut rng = StdRng::seed_from_u64(0);
590        let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
591
592        // Random restarts every x seconds
593        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
594        let supervised = Arc::new(Mutex::new(Vec::new()));
595        let mut prev_ctx = None;
596
597        loop {
598            let namespace = namespace.clone();
599            let shutdowns = shutdowns.clone();
600            let supervised = supervised.clone();
601            let polynomial = polynomial.clone();
602            let shares = shares.clone();
603
604            let f = |mut context: deterministic::Context| async move {
605                // Create simulated network
606                let (network, mut oracle) = Network::new(
607                    context.with_label("network"),
608                    Config {
609                        max_size: 1024 * 1024,
610                    },
611                );
612
613                // Start network
614                network.start();
615
616                // Register participants
617                let mut schemes = Vec::new();
618                let mut validators = Vec::new();
619                for i in 0..n {
620                    let scheme = PrivateKey::from_seed(i as u64);
621                    let pk = scheme.public_key();
622                    schemes.push(scheme);
623                    validators.push(pk);
624                }
625                validators.sort();
626                schemes.sort_by_key(|s| s.public_key());
627                let mut registrations = register_validators(&mut oracle, &validators).await;
628
629                // Link all validators
630                let link = Link {
631                    latency: 50.0,
632                    jitter: 50.0,
633                    success_rate: 1.0,
634                };
635                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
636
637                // Create engines
638                let relay = Arc::new(mocks::relay::Relay::new());
639                let mut supervisors = HashMap::new();
640                let mut engine_handlers = Vec::new();
641                for (idx, scheme) in schemes.into_iter().enumerate() {
642                    // Create scheme context
643                    let context = context
644                        .clone()
645                        .with_label(&format!("validator-{}", scheme.public_key()));
646
647                    // Configure engine
648                    let validator = scheme.public_key();
649                    let mut participants = BTreeMap::new();
650                    participants.insert(
651                        0,
652                        (polynomial.clone(), validators.clone(), shares[idx].clone()),
653                    );
654                    let supervisor_config = mocks::supervisor::Config::<_, V> {
655                        namespace: namespace.clone(),
656                        participants,
657                    };
658                    let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
659                    supervisors.insert(validator.clone(), supervisor.clone());
660                    let application_cfg = mocks::application::Config {
661                        hasher: Sha256::default(),
662                        relay: relay.clone(),
663                        participant: validator.clone(),
664                        propose_latency: (10.0, 5.0),
665                        verify_latency: (10.0, 5.0),
666                    };
667                    let (actor, application) = mocks::application::Application::new(
668                        context.with_label("application"),
669                        application_cfg,
670                    );
671                    actor.start();
672                    let blocker = oracle.control(scheme.public_key());
673                    let cfg = config::Config {
674                        crypto: scheme,
675                        blocker,
676                        automaton: application.clone(),
677                        relay: application.clone(),
678                        reporter: supervisor.clone(),
679                        supervisor,
680                        partition: validator.to_string(),
681                        compression: Some(3),
682                        mailbox_size: 1024,
683                        namespace: namespace.clone(),
684                        leader_timeout: Duration::from_secs(1),
685                        notarization_timeout: Duration::from_secs(2),
686                        nullify_retry: Duration::from_secs(10),
687                        fetch_timeout: Duration::from_secs(1),
688                        activity_timeout,
689                        skip_timeout,
690                        max_fetch_count: 1,
691                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
692                        fetch_concurrent: 1,
693                        replay_buffer: 1024 * 1024,
694                        write_buffer: 1024 * 1024,
695                    };
696                    let engine = Engine::new(context.with_label("engine"), cfg);
697
698                    // Start engine
699                    let (pending, recovered, resolver) = registrations
700                        .remove(&validator)
701                        .expect("validator should be registered");
702                    engine_handlers.push(engine.start(pending, recovered, resolver));
703                }
704
705                // Store all finalizer handles
706                let mut finalizers = Vec::new();
707                for (_, supervisor) in supervisors.iter_mut() {
708                    let (mut latest, mut monitor) = supervisor.subscribe().await;
709                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
710                        while latest < required_containers {
711                            latest = monitor.next().await.expect("event missing");
712                        }
713                    }));
714                }
715
716                // Exit at random points for unclean shutdown of entire set
717                let wait =
718                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
719                let result = select! {
720                    _ = context.sleep(wait) => {
721                        // Collect supervisors to check faults
722                        {
723                            let mut shutdowns = shutdowns.lock().unwrap();
724                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
725                            *shutdowns += 1;
726                        }
727                        supervised.lock().unwrap().push(supervisors);
728                        (false,context)
729                    },
730                    _ = join_all(finalizers) => {
731                        // Check supervisors for faults activity
732                        let supervised = supervised.lock().unwrap();
733                        for supervisors in supervised.iter() {
734                            for (_, supervisor) in supervisors.iter() {
735                                let faults = supervisor.faults.lock().unwrap();
736                                assert!(faults.is_empty());
737                            }
738                        }
739                        (true,context)
740                    }
741                };
742
743                // Ensure no blocked connections
744                let blocked = oracle.blocked().await.unwrap();
745                assert!(blocked.is_empty());
746
747                result
748            };
749
750            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
751                deterministic::Runner::from(prev_ctx)
752            } else {
753                deterministic::Runner::timed(Duration::from_secs(30))
754            }
755            .start(f);
756
757            // Check if we should exit
758            if complete {
759                break;
760            }
761
762            prev_ctx = Some(context.recover());
763        }
764    }
765
766    #[test_traced]
767    fn test_unclean_shutdown() {
768        unclean_shutdown::<MinPk>();
769        unclean_shutdown::<MinSig>();
770    }
771
772    fn backfill<V: Variant>() {
773        // Create context
774        let n = 4;
775        let threshold = quorum(n);
776        let required_containers = 100;
777        let activity_timeout = 10;
778        let skip_timeout = 5;
779        let namespace = b"consensus".to_vec();
780        let executor = deterministic::Runner::timed(Duration::from_secs(720));
781        executor.start(|mut context| async move {
782            // Create simulated network
783            let (network, mut oracle) = Network::new(
784                context.with_label("network"),
785                Config {
786                    max_size: 1024 * 1024,
787                },
788            );
789
790            // Start network
791            network.start();
792
793            // Register participants
794            let mut schemes = Vec::new();
795            let mut validators = Vec::new();
796            for i in 0..n {
797                let scheme = PrivateKey::from_seed(i as u64);
798                let pk = scheme.public_key();
799                schemes.push(scheme);
800                validators.push(pk);
801            }
802            validators.sort();
803            schemes.sort_by_key(|s| s.public_key());
804            let mut registrations = register_validators(&mut oracle, &validators).await;
805
806            // Link all validators except first
807            let link = Link {
808                latency: 10.0,
809                jitter: 1.0,
810                success_rate: 1.0,
811            };
812            link_validators(
813                &mut oracle,
814                &validators,
815                Action::Link(link),
816                Some(|_, i, j| ![i, j].contains(&0usize)),
817            )
818            .await;
819
820            // Derive threshold
821            let (polynomial, shares) =
822                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
823
824            // Create engines
825            let relay = Arc::new(mocks::relay::Relay::new());
826            let mut supervisors = Vec::new();
827            let mut engine_handlers = Vec::new();
828            for (idx_scheme, scheme) in schemes.iter().enumerate() {
829                // Skip first peer
830                if idx_scheme == 0 {
831                    continue;
832                }
833
834                // Create scheme context
835                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
836
837                // Configure engine
838                let validator = scheme.public_key();
839                let mut participants = BTreeMap::new();
840                participants.insert(
841                    0,
842                    (
843                        polynomial.clone(),
844                        validators.clone(),
845                        shares[idx_scheme].clone(),
846                    ),
847                );
848                let supervisor_config = mocks::supervisor::Config {
849                    namespace: namespace.clone(),
850                    participants,
851                };
852                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
853                supervisors.push(supervisor.clone());
854                let application_cfg = mocks::application::Config {
855                    hasher: Sha256::default(),
856                    relay: relay.clone(),
857                    participant: validator.clone(),
858                    propose_latency: (10.0, 5.0),
859                    verify_latency: (10.0, 5.0),
860                };
861                let (actor, application) = mocks::application::Application::new(
862                    context.with_label("application"),
863                    application_cfg,
864                );
865                actor.start();
866                let blocker = oracle.control(scheme.public_key());
867                let cfg = config::Config {
868                    crypto: scheme.clone(),
869                    blocker,
870                    automaton: application.clone(),
871                    relay: application.clone(),
872                    reporter: supervisor.clone(),
873                    supervisor,
874                    partition: validator.to_string(),
875                    compression: Some(3),
876                    mailbox_size: 1024,
877                    namespace: namespace.clone(),
878                    leader_timeout: Duration::from_secs(1),
879                    notarization_timeout: Duration::from_secs(2),
880                    nullify_retry: Duration::from_secs(10),
881                    fetch_timeout: Duration::from_secs(1),
882                    activity_timeout,
883                    skip_timeout,
884                    max_fetch_count: 1, // force many fetches
885                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
886                    fetch_concurrent: 1,
887                    replay_buffer: 1024 * 1024,
888                    write_buffer: 1024 * 1024,
889                };
890                let engine = Engine::new(context.with_label("engine"), cfg);
891
892                // Start engine
893                let (pending, recovered, resolver) = registrations
894                    .remove(&validator)
895                    .expect("validator should be registered");
896                engine_handlers.push(engine.start(pending, recovered, resolver));
897            }
898
899            // Wait for all engines to finish
900            let mut finalizers = Vec::new();
901            for supervisor in supervisors.iter_mut() {
902                let (mut latest, mut monitor) = supervisor.subscribe().await;
903                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
904                    while latest < required_containers {
905                        latest = monitor.next().await.expect("event missing");
906                    }
907                }));
908            }
909            join_all(finalizers).await;
910
911            // Degrade network connections for online peers
912            let link = Link {
913                latency: 3_000.0,
914                jitter: 0.0,
915                success_rate: 1.0,
916            };
917            link_validators(
918                &mut oracle,
919                &validators,
920                Action::Update(link.clone()),
921                Some(|_, i, j| ![i, j].contains(&0usize)),
922            )
923            .await;
924
925            // Wait for nullifications to accrue
926            context.sleep(Duration::from_secs(120)).await;
927
928            // Unlink second peer from all (except first)
929            link_validators(
930                &mut oracle,
931                &validators,
932                Action::Unlink,
933                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
934            )
935            .await;
936
937            // Configure engine for first peer
938            let scheme = schemes[0].clone();
939            let validator = scheme.public_key();
940            let context = context.with_label(&format!("validator-{validator}"));
941
942            // Link first peer to all (except second)
943            link_validators(
944                &mut oracle,
945                &validators,
946                Action::Link(link),
947                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
948            )
949            .await;
950
951            // Restore network connections for all online peers
952            let link = Link {
953                latency: 10.0,
954                jitter: 2.5,
955                success_rate: 1.0,
956            };
957            link_validators(
958                &mut oracle,
959                &validators,
960                Action::Update(link),
961                Some(|_, i, j| ![i, j].contains(&1usize)),
962            )
963            .await;
964
965            // Configure engine
966            let mut participants = BTreeMap::new();
967            participants.insert(
968                0,
969                (polynomial.clone(), validators.clone(), shares[0].clone()),
970            );
971            let supervisor_config = mocks::supervisor::Config::<_, V> {
972                namespace: namespace.clone(),
973                participants,
974            };
975            let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
976            supervisors.push(supervisor.clone());
977            let application_cfg = mocks::application::Config {
978                hasher: Sha256::default(),
979                relay: relay.clone(),
980                participant: validator.clone(),
981                propose_latency: (10.0, 5.0),
982                verify_latency: (10.0, 5.0),
983            };
984            let (actor, application) = mocks::application::Application::new(
985                context.with_label("application"),
986                application_cfg,
987            );
988            actor.start();
989            let blocker = oracle.control(scheme.public_key());
990            let cfg = config::Config {
991                crypto: scheme,
992                blocker,
993                automaton: application.clone(),
994                relay: application.clone(),
995                reporter: supervisor.clone(),
996                supervisor: supervisor.clone(),
997                partition: validator.to_string(),
998                compression: Some(3),
999                mailbox_size: 1024,
1000                namespace: namespace.clone(),
1001                leader_timeout: Duration::from_secs(1),
1002                notarization_timeout: Duration::from_secs(2),
1003                nullify_retry: Duration::from_secs(10),
1004                fetch_timeout: Duration::from_secs(1),
1005                activity_timeout,
1006                skip_timeout,
1007                max_fetch_count: 1,
1008                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1009                fetch_concurrent: 1,
1010                replay_buffer: 1024 * 1024,
1011                write_buffer: 1024 * 1024,
1012            };
1013            let engine = Engine::new(context.with_label("engine"), cfg);
1014
1015            // Start engine
1016            let (pending, recovered, resolver) = registrations
1017                .remove(&validator)
1018                .expect("validator should be registered");
1019            engine_handlers.push(engine.start(pending, recovered, resolver));
1020
1021            // Wait for new engine to finalize required
1022            let (mut latest, mut monitor) = supervisor.subscribe().await;
1023            while latest < required_containers {
1024                latest = monitor.next().await.expect("event missing");
1025            }
1026
1027            // Ensure no blocked connections
1028            let blocked = oracle.blocked().await.unwrap();
1029            assert!(blocked.is_empty());
1030        });
1031    }
1032
1033    #[test_traced]
1034    fn test_backfill() {
1035        backfill::<MinPk>();
1036        backfill::<MinSig>();
1037    }
1038
1039    fn one_offline<V: Variant>() {
1040        // Create context
1041        let n = 5;
1042        let threshold = quorum(n);
1043        let required_containers = 100;
1044        let activity_timeout = 10;
1045        let skip_timeout = 5;
1046        let max_exceptions = 10;
1047        let namespace = b"consensus".to_vec();
1048        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1049        executor.start(|mut context| async move {
1050            // Create simulated network
1051            let (network, mut oracle) = Network::new(
1052                context.with_label("network"),
1053                Config {
1054                    max_size: 1024 * 1024,
1055                },
1056            );
1057
1058            // Start network
1059            network.start();
1060
1061            // Register participants
1062            let mut schemes = Vec::new();
1063            let mut validators = Vec::new();
1064            for i in 0..n {
1065                let scheme = PrivateKey::from_seed(i as u64);
1066                let pk = scheme.public_key();
1067                schemes.push(scheme);
1068                validators.push(pk);
1069            }
1070            validators.sort();
1071            schemes.sort_by_key(|s| s.public_key());
1072            let mut registrations = register_validators(&mut oracle, &validators).await;
1073
1074            // Link all validators except first
1075            let link = Link {
1076                latency: 10.0,
1077                jitter: 1.0,
1078                success_rate: 1.0,
1079            };
1080            link_validators(
1081                &mut oracle,
1082                &validators,
1083                Action::Link(link),
1084                Some(|_, i, j| ![i, j].contains(&0usize)),
1085            )
1086            .await;
1087
1088            // Derive threshold
1089            let (polynomial, shares) =
1090                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1091
1092            // Create engines
1093            let relay = Arc::new(mocks::relay::Relay::new());
1094            let mut supervisors = Vec::new();
1095            let mut engine_handlers = Vec::new();
1096            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1097                // Skip first peer
1098                if idx_scheme == 0 {
1099                    continue;
1100                }
1101
1102                // Create scheme context
1103                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1104
1105                // Configure engine
1106                let validator = scheme.public_key();
1107                let mut participants = BTreeMap::new();
1108                participants.insert(
1109                    0,
1110                    (
1111                        polynomial.clone(),
1112                        validators.clone(),
1113                        shares[idx_scheme].clone(),
1114                    ),
1115                );
1116                let supervisor_config = mocks::supervisor::Config::<_, V> {
1117                    namespace: namespace.clone(),
1118                    participants,
1119                };
1120                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1121                supervisors.push(supervisor.clone());
1122                let application_cfg = mocks::application::Config {
1123                    hasher: Sha256::default(),
1124                    relay: relay.clone(),
1125                    participant: validator.clone(),
1126                    propose_latency: (10.0, 5.0),
1127                    verify_latency: (10.0, 5.0),
1128                };
1129                let (actor, application) = mocks::application::Application::new(
1130                    context.with_label("application"),
1131                    application_cfg,
1132                );
1133                actor.start();
1134                let blocker = oracle.control(scheme.public_key());
1135                let cfg = config::Config {
1136                    crypto: scheme,
1137                    blocker,
1138                    automaton: application.clone(),
1139                    relay: application.clone(),
1140                    reporter: supervisor.clone(),
1141                    supervisor,
1142                    partition: validator.to_string(),
1143                    compression: Some(3),
1144                    mailbox_size: 1024,
1145                    namespace: namespace.clone(),
1146                    leader_timeout: Duration::from_secs(1),
1147                    notarization_timeout: Duration::from_secs(2),
1148                    nullify_retry: Duration::from_secs(10),
1149                    fetch_timeout: Duration::from_secs(1),
1150                    activity_timeout,
1151                    skip_timeout,
1152                    max_fetch_count: 1,
1153                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1154                    fetch_concurrent: 1,
1155                    replay_buffer: 1024 * 1024,
1156                    write_buffer: 1024 * 1024,
1157                };
1158                let engine = Engine::new(context.with_label("engine"), cfg);
1159
1160                // Start engine
1161                let (pending, recovered, resolver) = registrations
1162                    .remove(&validator)
1163                    .expect("validator should be registered");
1164                engine_handlers.push(engine.start(pending, recovered, resolver));
1165            }
1166
1167            // Wait for all engines to finish
1168            let mut finalizers = Vec::new();
1169            for supervisor in supervisors.iter_mut() {
1170                let (mut latest, mut monitor) = supervisor.subscribe().await;
1171                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1172                    while latest < required_containers {
1173                        latest = monitor.next().await.expect("event missing");
1174                    }
1175                }));
1176            }
1177            join_all(finalizers).await;
1178
1179            // Check supervisors for correct activity
1180            let exceptions = 0;
1181            let offline = &validators[0];
1182            for supervisor in supervisors.iter() {
1183                // Ensure no faults
1184                {
1185                    let faults = supervisor.faults.lock().unwrap();
1186                    assert!(faults.is_empty());
1187                }
1188
1189                // Ensure no invalid signatures
1190                {
1191                    let invalid = supervisor.invalid.lock().unwrap();
1192                    assert_eq!(*invalid, 0);
1193                }
1194
1195                // Ensure offline node is never active
1196                let mut exceptions = 0;
1197                {
1198                    let notarizes = supervisor.notarizes.lock().unwrap();
1199                    for (view, payloads) in notarizes.iter() {
1200                        for (_, participants) in payloads.iter() {
1201                            if participants.contains(offline) {
1202                                panic!("view: {view}");
1203                            }
1204                        }
1205                    }
1206                }
1207                {
1208                    let nullifies = supervisor.nullifies.lock().unwrap();
1209                    for (view, participants) in nullifies.iter() {
1210                        if participants.contains(offline) {
1211                            panic!("view: {view}");
1212                        }
1213                    }
1214                }
1215                {
1216                    let finalizes = supervisor.finalizes.lock().unwrap();
1217                    for (view, payloads) in finalizes.iter() {
1218                        for (_, finalizers) in payloads.iter() {
1219                            if finalizers.contains(offline) {
1220                                panic!("view: {view}");
1221                            }
1222                        }
1223                    }
1224                }
1225
1226                // Identify offline views
1227                let mut offline_views = Vec::new();
1228                {
1229                    let leaders = supervisor.leaders.lock().unwrap();
1230                    for (view, leader) in leaders.iter() {
1231                        if leader == offline {
1232                            offline_views.push(*view);
1233                        }
1234                    }
1235                }
1236                assert!(!offline_views.is_empty());
1237
1238                // Ensure nullifies/nullification collected for offline node
1239                {
1240                    let nullifies = supervisor.nullifies.lock().unwrap();
1241                    for view in offline_views.iter() {
1242                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1243                        if nullifies < threshold as usize {
1244                            warn!("missing expected view nullifies: {}", view);
1245                            exceptions += 1;
1246                        }
1247                    }
1248                }
1249                {
1250                    let nullifications = supervisor.nullifications.lock().unwrap();
1251                    for view in offline_views.iter() {
1252                        if !nullifications.contains_key(view) {
1253                            warn!("missing expected view nullifies: {}", view);
1254                            exceptions += 1;
1255                        }
1256                    }
1257                }
1258
1259                // Ensure exceptions within allowed
1260                assert!(exceptions <= max_exceptions);
1261            }
1262            assert!(exceptions <= max_exceptions);
1263
1264            // Ensure no blocked connections
1265            let blocked = oracle.blocked().await.unwrap();
1266            assert!(blocked.is_empty());
1267
1268            // Ensure we are skipping views
1269            let encoded = context.encode();
1270            let lines = encoded.lines();
1271            let mut skipped_views = 0;
1272            let mut nodes_skipping = 0;
1273            for line in lines {
1274                if line.contains("_engine_voter_skipped_views_total") {
1275                    let parts: Vec<&str> = line.split_whitespace().collect();
1276                    if let Some(number_str) = parts.last() {
1277                        if let Ok(number) = number_str.parse::<u64>() {
1278                            if number > 0 {
1279                                nodes_skipping += 1;
1280                            }
1281                            if number > skipped_views {
1282                                skipped_views = number;
1283                            }
1284                        }
1285                    }
1286                }
1287            }
1288            assert!(
1289                skipped_views > 0,
1290                "expected skipped views to be greater than 0"
1291            );
1292            assert_eq!(
1293                nodes_skipping,
1294                n - 1,
1295                "expected all online nodes to be skipping views"
1296            );
1297        });
1298    }
1299
1300    #[test_traced]
1301    fn test_one_offline() {
1302        one_offline::<MinPk>();
1303        one_offline::<MinSig>();
1304    }
1305
1306    fn slow_validator<V: Variant>() {
1307        // Create context
1308        let n = 5;
1309        let threshold = quorum(n);
1310        let required_containers = 50;
1311        let activity_timeout = 10;
1312        let skip_timeout = 5;
1313        let namespace = b"consensus".to_vec();
1314        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1315        executor.start(|mut context| async move {
1316            // Create simulated network
1317            let (network, mut oracle) = Network::new(
1318                context.with_label("network"),
1319                Config {
1320                    max_size: 1024 * 1024,
1321                },
1322            );
1323
1324            // Start network
1325            network.start();
1326
1327            // Register participants
1328            let mut schemes = Vec::new();
1329            let mut validators = Vec::new();
1330            for i in 0..n {
1331                let scheme = PrivateKey::from_seed(i as u64);
1332                let pk = scheme.public_key();
1333                schemes.push(scheme);
1334                validators.push(pk);
1335            }
1336            validators.sort();
1337            schemes.sort_by_key(|s| s.public_key());
1338            let mut registrations = register_validators(&mut oracle, &validators).await;
1339
1340            // Link all validators
1341            let link = Link {
1342                latency: 10.0,
1343                jitter: 1.0,
1344                success_rate: 1.0,
1345            };
1346            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1347
1348            // Derive threshold
1349            let (polynomial, shares) =
1350                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1351
1352            // Create engines
1353            let relay = Arc::new(mocks::relay::Relay::new());
1354            let mut supervisors = Vec::new();
1355            let mut engine_handlers = Vec::new();
1356            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1357                // Create scheme context
1358                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1359
1360                // Configure engine
1361                let validator = scheme.public_key();
1362                let mut participants = BTreeMap::new();
1363                participants.insert(
1364                    0,
1365                    (
1366                        polynomial.clone(),
1367                        validators.clone(),
1368                        shares[idx_scheme].clone(),
1369                    ),
1370                );
1371                let supervisor_config = mocks::supervisor::Config::<_, V> {
1372                    namespace: namespace.clone(),
1373                    participants,
1374                };
1375                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1376                supervisors.push(supervisor.clone());
1377                let application_cfg = if idx_scheme == 0 {
1378                    mocks::application::Config {
1379                        hasher: Sha256::default(),
1380                        relay: relay.clone(),
1381                        participant: validator.clone(),
1382                        propose_latency: (10_000.0, 0.0),
1383                        verify_latency: (10_000.0, 5.0),
1384                    }
1385                } else {
1386                    mocks::application::Config {
1387                        hasher: Sha256::default(),
1388                        relay: relay.clone(),
1389                        participant: validator.clone(),
1390                        propose_latency: (10.0, 5.0),
1391                        verify_latency: (10.0, 5.0),
1392                    }
1393                };
1394                let (actor, application) = mocks::application::Application::new(
1395                    context.with_label("application"),
1396                    application_cfg,
1397                );
1398                actor.start();
1399                let blocker = oracle.control(scheme.public_key());
1400                let cfg = config::Config {
1401                    crypto: scheme,
1402                    blocker,
1403                    automaton: application.clone(),
1404                    relay: application.clone(),
1405                    reporter: supervisor.clone(),
1406                    supervisor,
1407                    partition: validator.to_string(),
1408                    compression: Some(3),
1409                    mailbox_size: 1024,
1410                    namespace: namespace.clone(),
1411                    leader_timeout: Duration::from_secs(1),
1412                    notarization_timeout: Duration::from_secs(2),
1413                    nullify_retry: Duration::from_secs(10),
1414                    fetch_timeout: Duration::from_secs(1),
1415                    activity_timeout,
1416                    skip_timeout,
1417                    max_fetch_count: 1,
1418                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1419                    fetch_concurrent: 1,
1420                    replay_buffer: 1024 * 1024,
1421                    write_buffer: 1024 * 1024,
1422                };
1423                let engine = Engine::new(context.with_label("engine"), cfg);
1424
1425                // Start engine
1426                let (pending, recovered, resolver) = registrations
1427                    .remove(&validator)
1428                    .expect("validator should be registered");
1429                engine_handlers.push(engine.start(pending, recovered, resolver));
1430            }
1431
1432            // Wait for all engines to finish
1433            let mut finalizers = Vec::new();
1434            for supervisor in supervisors.iter_mut() {
1435                let (mut latest, mut monitor) = supervisor.subscribe().await;
1436                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1437                    while latest < required_containers {
1438                        latest = monitor.next().await.expect("event missing");
1439                    }
1440                }));
1441            }
1442            join_all(finalizers).await;
1443
1444            // Check supervisors for correct activity
1445            let slow = &validators[0];
1446            for supervisor in supervisors.iter() {
1447                // Ensure no faults
1448                {
1449                    let faults = supervisor.faults.lock().unwrap();
1450                    assert!(faults.is_empty());
1451                }
1452
1453                // Ensure no invalid signatures
1454                {
1455                    let invalid = supervisor.invalid.lock().unwrap();
1456                    assert_eq!(*invalid, 0);
1457                }
1458
1459                // Ensure slow node never emits a notarize or finalize (will never finish verification in a timely manner)
1460                {
1461                    let notarizes = supervisor.notarizes.lock().unwrap();
1462                    for (view, payloads) in notarizes.iter() {
1463                        for (_, participants) in payloads.iter() {
1464                            if participants.contains(slow) {
1465                                panic!("view: {view}");
1466                            }
1467                        }
1468                    }
1469                }
1470                {
1471                    let finalizes = supervisor.finalizes.lock().unwrap();
1472                    for (view, payloads) in finalizes.iter() {
1473                        for (_, finalizers) in payloads.iter() {
1474                            if finalizers.contains(slow) {
1475                                panic!("view: {view}");
1476                            }
1477                        }
1478                    }
1479                }
1480            }
1481
1482            // Ensure no blocked connections
1483            let blocked = oracle.blocked().await.unwrap();
1484            assert!(blocked.is_empty());
1485        });
1486    }
1487
1488    #[test_traced]
1489    fn test_slow_validator() {
1490        slow_validator::<MinPk>();
1491        slow_validator::<MinSig>();
1492    }
1493
1494    fn all_recovery<V: Variant>() {
1495        // Create context
1496        let n = 5;
1497        let threshold = quorum(n);
1498        let required_containers = 100;
1499        let activity_timeout = 10;
1500        let skip_timeout = 2;
1501        let namespace = b"consensus".to_vec();
1502        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1503        executor.start(|mut context| async move {
1504            // Create simulated network
1505            let (network, mut oracle) = Network::new(
1506                context.with_label("network"),
1507                Config {
1508                    max_size: 1024 * 1024,
1509                },
1510            );
1511
1512            // Start network
1513            network.start();
1514
1515            // Register participants
1516            let mut schemes = Vec::new();
1517            let mut validators = Vec::new();
1518            for i in 0..n {
1519                let scheme = PrivateKey::from_seed(i as u64);
1520                let pk = scheme.public_key();
1521                schemes.push(scheme);
1522                validators.push(pk);
1523            }
1524            validators.sort();
1525            schemes.sort_by_key(|s| s.public_key());
1526            let mut registrations = register_validators(&mut oracle, &validators).await;
1527
1528            // Link all validators
1529            let link = Link {
1530                latency: 3_000.0,
1531                jitter: 0.0,
1532                success_rate: 1.0,
1533            };
1534            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1535
1536            // Derive threshold
1537            let (polynomial, shares) =
1538                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1539
1540            // Create engines
1541            let relay = Arc::new(mocks::relay::Relay::new());
1542            let mut supervisors = Vec::new();
1543            let mut engine_handlers = Vec::new();
1544            for (idx, scheme) in schemes.iter().enumerate() {
1545                // Create scheme context
1546                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1547
1548                // Configure engine
1549                let validator = scheme.public_key();
1550                let mut participants = BTreeMap::new();
1551                participants.insert(
1552                    0,
1553                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1554                );
1555                let supervisor_config = mocks::supervisor::Config::<_, V> {
1556                    namespace: namespace.clone(),
1557                    participants,
1558                };
1559                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1560                supervisors.push(supervisor.clone());
1561                let application_cfg = mocks::application::Config {
1562                    hasher: Sha256::default(),
1563                    relay: relay.clone(),
1564                    participant: validator.clone(),
1565                    propose_latency: (10.0, 5.0),
1566                    verify_latency: (10.0, 5.0),
1567                };
1568                let (actor, application) = mocks::application::Application::new(
1569                    context.with_label("application"),
1570                    application_cfg,
1571                );
1572                actor.start();
1573                let blocker = oracle.control(scheme.public_key());
1574                let cfg = config::Config {
1575                    crypto: scheme.clone(),
1576                    blocker,
1577                    automaton: application.clone(),
1578                    relay: application.clone(),
1579                    reporter: supervisor.clone(),
1580                    supervisor,
1581                    partition: validator.to_string(),
1582                    compression: Some(3),
1583                    mailbox_size: 1024,
1584                    namespace: namespace.clone(),
1585                    leader_timeout: Duration::from_secs(1),
1586                    notarization_timeout: Duration::from_secs(2),
1587                    nullify_retry: Duration::from_secs(10),
1588                    fetch_timeout: Duration::from_secs(1),
1589                    activity_timeout,
1590                    skip_timeout,
1591                    max_fetch_count: 1,
1592                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1593                    fetch_concurrent: 1,
1594                    replay_buffer: 1024 * 1024,
1595                    write_buffer: 1024 * 1024,
1596                };
1597                let engine = Engine::new(context.with_label("engine"), cfg);
1598
1599                // Start engine
1600                let (pending, recovered, resolver) = registrations
1601                    .remove(&validator)
1602                    .expect("validator should be registered");
1603                engine_handlers.push(engine.start(pending, recovered, resolver));
1604            }
1605
1606            // Wait for a few virtual minutes (shouldn't finalize anything)
1607            let mut finalizers = Vec::new();
1608            for supervisor in supervisors.iter_mut() {
1609                let (_, mut monitor) = supervisor.subscribe().await;
1610                finalizers.push(
1611                    context
1612                        .with_label("finalizer")
1613                        .spawn(move |context| async move {
1614                            select! {
1615                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1616                                _done = monitor.next() => {
1617                                    panic!("engine should not notarize or finalize anything");
1618                                }
1619                            }
1620                        }),
1621                );
1622            }
1623            join_all(finalizers).await;
1624
1625            // Unlink all validators to get latest view
1626            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1627
1628            // Wait for a virtual minute (nothing should happen)
1629            context.sleep(Duration::from_secs(60)).await;
1630
1631            // Get latest view
1632            let mut latest = 0;
1633            for supervisor in supervisors.iter() {
1634                let nullifies = supervisor.nullifies.lock().unwrap();
1635                let max = nullifies.keys().max().unwrap();
1636                if *max > latest {
1637                    latest = *max;
1638                }
1639            }
1640
1641            // Update links
1642            let link = Link {
1643                latency: 10.0,
1644                jitter: 1.0,
1645                success_rate: 1.0,
1646            };
1647            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1648
1649            // Wait for all engines to finish
1650            let mut finalizers = Vec::new();
1651            for supervisor in supervisors.iter_mut() {
1652                let (mut latest, mut monitor) = supervisor.subscribe().await;
1653                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1654                    while latest < required_containers {
1655                        latest = monitor.next().await.expect("event missing");
1656                    }
1657                }));
1658            }
1659            join_all(finalizers).await;
1660
1661            // Check supervisors for correct activity
1662            for supervisor in supervisors.iter() {
1663                // Ensure no faults
1664                {
1665                    let faults = supervisor.faults.lock().unwrap();
1666                    assert!(faults.is_empty());
1667                }
1668
1669                // Ensure no invalid signatures
1670                {
1671                    let invalid = supervisor.invalid.lock().unwrap();
1672                    assert_eq!(*invalid, 0);
1673                }
1674
1675                // Ensure quick recovery.
1676                //
1677                // If the skip timeout isn't implemented correctly, we may go many views before participants
1678                // start to consider a validator's proposal.
1679                {
1680                    // Ensure nearly all views around latest finalize
1681                    let mut found = 0;
1682                    let finalizations = supervisor.finalizations.lock().unwrap();
1683                    for i in latest..latest + activity_timeout {
1684                        if finalizations.contains_key(&i) {
1685                            found += 1;
1686                        }
1687                    }
1688                    assert!(found >= activity_timeout - 2, "found: {found}");
1689                }
1690            }
1691
1692            // Ensure no blocked connections
1693            let blocked = oracle.blocked().await.unwrap();
1694            assert!(blocked.is_empty());
1695        });
1696    }
1697
1698    #[test_traced]
1699    fn test_all_recovery() {
1700        all_recovery::<MinPk>();
1701        all_recovery::<MinSig>();
1702    }
1703
1704    fn partition<V: Variant>() {
1705        // Create context
1706        let n = 10;
1707        let threshold = quorum(n);
1708        let required_containers = 50;
1709        let activity_timeout = 10;
1710        let skip_timeout = 5;
1711        let namespace = b"consensus".to_vec();
1712        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1713        executor.start(|mut context| async move {
1714            // Create simulated network
1715            let (network, mut oracle) = Network::new(
1716                context.with_label("network"),
1717                Config {
1718                    max_size: 1024 * 1024,
1719                },
1720            );
1721
1722            // Start network
1723            network.start();
1724
1725            // Register participants
1726            let mut schemes = Vec::new();
1727            let mut validators = Vec::new();
1728            for i in 0..n {
1729                let scheme = PrivateKey::from_seed(i as u64);
1730                let pk = scheme.public_key();
1731                schemes.push(scheme);
1732                validators.push(pk);
1733            }
1734            validators.sort();
1735            schemes.sort_by_key(|s| s.public_key());
1736            let mut registrations = register_validators(&mut oracle, &validators).await;
1737
1738            // Link all validators
1739            let link = Link {
1740                latency: 10.0,
1741                jitter: 1.0,
1742                success_rate: 1.0,
1743            };
1744            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1745
1746            // Derive threshold
1747            let (polynomial, shares) =
1748                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1749
1750            // Create engines
1751            let relay = Arc::new(mocks::relay::Relay::new());
1752            let mut supervisors = Vec::new();
1753            let mut engine_handlers = Vec::new();
1754            for (idx, scheme) in schemes.iter().enumerate() {
1755                // Create scheme context
1756                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1757
1758                // Configure engine
1759                let validator = scheme.public_key();
1760                let mut participants = BTreeMap::new();
1761                participants.insert(
1762                    0,
1763                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1764                );
1765                let supervisor_config = mocks::supervisor::Config::<_, V> {
1766                    namespace: namespace.clone(),
1767                    participants,
1768                };
1769                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1770                supervisors.push(supervisor.clone());
1771                let application_cfg = mocks::application::Config {
1772                    hasher: Sha256::default(),
1773                    relay: relay.clone(),
1774                    participant: validator.clone(),
1775                    propose_latency: (10.0, 5.0),
1776                    verify_latency: (10.0, 5.0),
1777                };
1778                let (actor, application) = mocks::application::Application::new(
1779                    context.with_label("application"),
1780                    application_cfg,
1781                );
1782                actor.start();
1783                let blocker = oracle.control(scheme.public_key());
1784                let cfg = config::Config {
1785                    crypto: scheme.clone(),
1786                    blocker,
1787                    automaton: application.clone(),
1788                    relay: application.clone(),
1789                    reporter: supervisor.clone(),
1790                    supervisor,
1791                    partition: validator.to_string(),
1792                    compression: Some(3),
1793                    mailbox_size: 1024,
1794                    namespace: namespace.clone(),
1795                    leader_timeout: Duration::from_secs(1),
1796                    notarization_timeout: Duration::from_secs(2),
1797                    nullify_retry: Duration::from_secs(10),
1798                    fetch_timeout: Duration::from_secs(1),
1799                    activity_timeout,
1800                    skip_timeout,
1801                    max_fetch_count: 1,
1802                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1803                    fetch_concurrent: 1,
1804                    replay_buffer: 1024 * 1024,
1805                    write_buffer: 1024 * 1024,
1806                };
1807                let engine = Engine::new(context.with_label("engine"), cfg);
1808
1809                // Start engine
1810                let (pending, recovered, resolver) = registrations
1811                    .remove(&validator)
1812                    .expect("validator should be registered");
1813                engine_handlers.push(engine.start(pending, recovered, resolver));
1814            }
1815
1816            // Wait for all engines to finish
1817            let mut finalizers = Vec::new();
1818            for supervisor in supervisors.iter_mut() {
1819                let (mut latest, mut monitor) = supervisor.subscribe().await;
1820                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1821                    while latest < required_containers {
1822                        latest = monitor.next().await.expect("event missing");
1823                    }
1824                }));
1825            }
1826            join_all(finalizers).await;
1827
1828            // Cut all links between validator halves
1829            fn separated(n: usize, a: usize, b: usize) -> bool {
1830                let m = n / 2;
1831                (a < m && b >= m) || (a >= m && b < m)
1832            }
1833            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1834
1835            // Wait for any in-progress notarizations/finalizations to finish
1836            context.sleep(Duration::from_secs(10)).await;
1837
1838            // Wait for a few virtual minutes (shouldn't finalize anything)
1839            let mut finalizers = Vec::new();
1840            for supervisor in supervisors.iter_mut() {
1841                let (_, mut monitor) = supervisor.subscribe().await;
1842                finalizers.push(
1843                    context
1844                        .with_label("finalizer")
1845                        .spawn(move |context| async move {
1846                            select! {
1847                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1848                                _done = monitor.next() => {
1849                                    panic!("engine should not notarize or finalize anything");
1850                                }
1851                            }
1852                        }),
1853                );
1854            }
1855            join_all(finalizers).await;
1856
1857            // Restore links
1858            link_validators(
1859                &mut oracle,
1860                &validators,
1861                Action::Link(link),
1862                Some(separated),
1863            )
1864            .await;
1865
1866            // Wait for all engines to finish
1867            let mut finalizers = Vec::new();
1868            for supervisor in supervisors.iter_mut() {
1869                let (mut latest, mut monitor) = supervisor.subscribe().await;
1870                let required = latest + required_containers;
1871                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1872                    while latest < required {
1873                        latest = monitor.next().await.expect("event missing");
1874                    }
1875                }));
1876            }
1877            join_all(finalizers).await;
1878
1879            // Check supervisors for correct activity
1880            for supervisor in supervisors.iter() {
1881                // Ensure no faults
1882                {
1883                    let faults = supervisor.faults.lock().unwrap();
1884                    assert!(faults.is_empty());
1885                }
1886
1887                // Ensure no invalid signatures
1888                {
1889                    let invalid = supervisor.invalid.lock().unwrap();
1890                    assert_eq!(*invalid, 0);
1891                }
1892            }
1893
1894            // Ensure no blocked connections
1895            let blocked = oracle.blocked().await.unwrap();
1896            assert!(blocked.is_empty());
1897        });
1898    }
1899
1900    #[test_traced]
1901    #[ignore]
1902    fn test_partition() {
1903        partition::<MinPk>();
1904        partition::<MinSig>();
1905    }
1906
1907    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
1908        // Create context
1909        let n = 5;
1910        let threshold = quorum(n);
1911        let required_containers = 50;
1912        let activity_timeout = 10;
1913        let skip_timeout = 5;
1914        let namespace = b"consensus".to_vec();
1915        let cfg = deterministic::Config::new()
1916            .with_seed(seed)
1917            .with_timeout(Some(Duration::from_secs(5_000)));
1918        let executor = deterministic::Runner::new(cfg);
1919        executor.start(|mut context| async move {
1920            // Create simulated network
1921            let (network, mut oracle) = Network::new(
1922                context.with_label("network"),
1923                Config {
1924                    max_size: 1024 * 1024,
1925                },
1926            );
1927
1928            // Start network
1929            network.start();
1930
1931            // Register participants
1932            let mut schemes = Vec::new();
1933            let mut validators = Vec::new();
1934            for i in 0..n {
1935                let scheme = PrivateKey::from_seed(i as u64);
1936                let pk = scheme.public_key();
1937                schemes.push(scheme);
1938                validators.push(pk);
1939            }
1940            validators.sort();
1941            schemes.sort_by_key(|s| s.public_key());
1942            let mut registrations = register_validators(&mut oracle, &validators).await;
1943
1944            // Link all validators
1945            let degraded_link = Link {
1946                latency: 200.0,
1947                jitter: 150.0,
1948                success_rate: 0.5,
1949            };
1950            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1951
1952            // Derive threshold
1953            let (polynomial, shares) =
1954                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1955
1956            // Create engines
1957            let relay = Arc::new(mocks::relay::Relay::new());
1958            let mut supervisors = Vec::new();
1959            let mut engine_handlers = Vec::new();
1960            for (idx, scheme) in schemes.into_iter().enumerate() {
1961                // Create scheme context
1962                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1963
1964                // Configure engine
1965                let validator = scheme.public_key();
1966                let mut participants = BTreeMap::new();
1967                participants.insert(
1968                    0,
1969                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
1970                );
1971                let supervisor_config = mocks::supervisor::Config::<_, V> {
1972                    namespace: namespace.clone(),
1973                    participants,
1974                };
1975                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1976                supervisors.push(supervisor.clone());
1977                let application_cfg = mocks::application::Config {
1978                    hasher: Sha256::default(),
1979                    relay: relay.clone(),
1980                    participant: validator.clone(),
1981                    propose_latency: (10.0, 5.0),
1982                    verify_latency: (10.0, 5.0),
1983                };
1984                let (actor, application) = mocks::application::Application::new(
1985                    context.with_label("application"),
1986                    application_cfg,
1987                );
1988                actor.start();
1989                let blocker = oracle.control(scheme.public_key());
1990                let cfg = config::Config {
1991                    crypto: scheme,
1992                    blocker,
1993                    automaton: application.clone(),
1994                    relay: application.clone(),
1995                    reporter: supervisor.clone(),
1996                    supervisor,
1997                    partition: validator.to_string(),
1998                    compression: Some(3),
1999                    mailbox_size: 1024,
2000                    namespace: namespace.clone(),
2001                    leader_timeout: Duration::from_secs(1),
2002                    notarization_timeout: Duration::from_secs(2),
2003                    nullify_retry: Duration::from_secs(10),
2004                    fetch_timeout: Duration::from_secs(1),
2005                    activity_timeout,
2006                    skip_timeout,
2007                    max_fetch_count: 1,
2008                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2009                    fetch_concurrent: 1,
2010                    replay_buffer: 1024 * 1024,
2011                    write_buffer: 1024 * 1024,
2012                };
2013                let engine = Engine::new(context.with_label("engine"), cfg);
2014
2015                // Start engine
2016                let (pending, recovered, resolver) = registrations
2017                    .remove(&validator)
2018                    .expect("validator should be registered");
2019                engine_handlers.push(engine.start(pending, recovered, resolver));
2020            }
2021
2022            // Wait for all engines to finish
2023            let mut finalizers = Vec::new();
2024            for supervisor in supervisors.iter_mut() {
2025                let (mut latest, mut monitor) = supervisor.subscribe().await;
2026                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2027                    while latest < required_containers {
2028                        latest = monitor.next().await.expect("event missing");
2029                    }
2030                }));
2031            }
2032            join_all(finalizers).await;
2033
2034            // Check supervisors for correct activity
2035            for supervisor in supervisors.iter() {
2036                // Ensure no faults
2037                {
2038                    let faults = supervisor.faults.lock().unwrap();
2039                    assert!(faults.is_empty());
2040                }
2041
2042                // Ensure no invalid signatures
2043                {
2044                    let invalid = supervisor.invalid.lock().unwrap();
2045                    assert_eq!(*invalid, 0);
2046                }
2047            }
2048
2049            // Ensure no blocked connections
2050            let blocked = oracle.blocked().await.unwrap();
2051            assert!(blocked.is_empty());
2052
2053            context.auditor().state()
2054        })
2055    }
2056
2057    #[test_traced]
2058    fn test_slow_and_lossy_links() {
2059        slow_and_lossy_links::<MinPk>(0);
2060        slow_and_lossy_links::<MinSig>(0);
2061    }
2062
2063    #[test_traced]
2064    #[ignore]
2065    fn test_determinism() {
2066        // We use slow and lossy links as the deterministic test
2067        // because it is the most complex test.
2068        for seed in 1..6 {
2069            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2070            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2071            assert_eq!(pk_state_1, pk_state_2);
2072
2073            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2074            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2075            assert_eq!(sig_state_1, sig_state_2);
2076
2077            // Sanity check that different types can't be identical
2078            assert_ne!(pk_state_1, sig_state_1);
2079        }
2080    }
2081
2082    fn conflicter<V: Variant>(seed: u64) {
2083        // Create context
2084        let n = 4;
2085        let threshold = quorum(n);
2086        let required_containers = 50;
2087        let activity_timeout = 10;
2088        let skip_timeout = 5;
2089        let namespace = b"consensus".to_vec();
2090        let cfg = deterministic::Config::new()
2091            .with_seed(seed)
2092            .with_timeout(Some(Duration::from_secs(30)));
2093        let executor = deterministic::Runner::new(cfg);
2094        executor.start(|mut context| async move {
2095            // Create simulated network
2096            let (network, mut oracle) = Network::new(
2097                context.with_label("network"),
2098                Config {
2099                    max_size: 1024 * 1024,
2100                },
2101            );
2102
2103            // Start network
2104            network.start();
2105
2106            // Register participants
2107            let mut schemes = Vec::new();
2108            let mut validators = Vec::new();
2109            for i in 0..n {
2110                let scheme = PrivateKey::from_seed(i as u64);
2111                let pk = scheme.public_key();
2112                schemes.push(scheme);
2113                validators.push(pk);
2114            }
2115            validators.sort();
2116            schemes.sort_by_key(|s| s.public_key());
2117            let mut registrations = register_validators(&mut oracle, &validators).await;
2118
2119            // Link all validators
2120            let link = Link {
2121                latency: 10.0,
2122                jitter: 1.0,
2123                success_rate: 1.0,
2124            };
2125            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2126
2127            // Derive threshold
2128            let (polynomial, shares) =
2129                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2130
2131            // Create engines
2132            let relay = Arc::new(mocks::relay::Relay::new());
2133            let mut supervisors = Vec::new();
2134            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2135                // Create scheme context
2136                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2137
2138                // Start engine
2139                let validator = scheme.public_key();
2140                let mut participants = BTreeMap::new();
2141                participants.insert(
2142                    0,
2143                    (
2144                        polynomial.clone(),
2145                        validators.clone(),
2146                        shares[idx_scheme].clone(),
2147                    ),
2148                );
2149                let supervisor_config = mocks::supervisor::Config::<_, V> {
2150                    namespace: namespace.clone(),
2151                    participants,
2152                };
2153                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2154                let (pending, recovered, resolver) = registrations
2155                    .remove(&validator)
2156                    .expect("validator should be registered");
2157                if idx_scheme == 0 {
2158                    let cfg = mocks::conflicter::Config {
2159                        supervisor,
2160                        namespace: namespace.clone(),
2161                    };
2162
2163                    let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2164                        mocks::conflicter::Conflicter::new(
2165                            context.with_label("byzantine_engine"),
2166                            cfg,
2167                        );
2168                    engine.start(pending);
2169                } else {
2170                    supervisors.push(supervisor.clone());
2171                    let application_cfg = mocks::application::Config {
2172                        hasher: Sha256::default(),
2173                        relay: relay.clone(),
2174                        participant: validator.clone(),
2175                        propose_latency: (10.0, 5.0),
2176                        verify_latency: (10.0, 5.0),
2177                    };
2178                    let (actor, application) = mocks::application::Application::new(
2179                        context.with_label("application"),
2180                        application_cfg,
2181                    );
2182                    actor.start();
2183                    let blocker = oracle.control(scheme.public_key());
2184                    let cfg = config::Config {
2185                        crypto: scheme,
2186                        blocker,
2187                        automaton: application.clone(),
2188                        relay: application.clone(),
2189                        reporter: supervisor.clone(),
2190                        supervisor,
2191                        partition: validator.to_string(),
2192                        compression: Some(3),
2193                        mailbox_size: 1024,
2194                        namespace: namespace.clone(),
2195                        leader_timeout: Duration::from_secs(1),
2196                        notarization_timeout: Duration::from_secs(2),
2197                        nullify_retry: Duration::from_secs(10),
2198                        fetch_timeout: Duration::from_secs(1),
2199                        activity_timeout,
2200                        skip_timeout,
2201                        max_fetch_count: 1,
2202                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2203                        fetch_concurrent: 1,
2204                        replay_buffer: 1024 * 1024,
2205                        write_buffer: 1024 * 1024,
2206                    };
2207                    let engine = Engine::new(context.with_label("engine"), cfg);
2208                    engine.start(pending, recovered, resolver);
2209                }
2210            }
2211
2212            // Wait for all engines to finish
2213            let mut finalizers = Vec::new();
2214            for supervisor in supervisors.iter_mut() {
2215                let (mut latest, mut monitor) = supervisor.subscribe().await;
2216                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2217                    while latest < required_containers {
2218                        latest = monitor.next().await.expect("event missing");
2219                    }
2220                }));
2221            }
2222            join_all(finalizers).await;
2223
2224            // Check supervisors for correct activity
2225            let byz = &validators[0];
2226            let mut count_conflicting = 0;
2227            for supervisor in supervisors.iter() {
2228                // Ensure only faults for byz
2229                {
2230                    let faults = supervisor.faults.lock().unwrap();
2231                    assert_eq!(faults.len(), 1);
2232                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2233                    for (_, faults) in faulter.iter() {
2234                        for fault in faults.iter() {
2235                            match fault {
2236                                Activity::ConflictingNotarize(_) => {
2237                                    count_conflicting += 1;
2238                                }
2239                                Activity::ConflictingFinalize(_) => {
2240                                    count_conflicting += 1;
2241                                }
2242                                _ => panic!("unexpected fault: {fault:?}"),
2243                            }
2244                        }
2245                    }
2246                }
2247
2248                // Ensure no invalid signatures
2249                {
2250                    let invalid = supervisor.invalid.lock().unwrap();
2251                    assert_eq!(*invalid, 0);
2252                }
2253            }
2254            assert!(count_conflicting > 0);
2255
2256            // Ensure conflicter is blocked
2257            let blocked = oracle.blocked().await.unwrap();
2258            assert!(!blocked.is_empty());
2259            for (a, b) in blocked {
2260                assert_ne!(&a, byz);
2261                assert_eq!(&b, byz);
2262            }
2263        });
2264    }
2265
2266    #[test_traced]
2267    #[ignore]
2268    fn test_conflicter() {
2269        for seed in 0..5 {
2270            conflicter::<MinPk>(seed);
2271            conflicter::<MinSig>(seed);
2272        }
2273    }
2274
2275    fn invalid<V: Variant>(seed: u64) {
2276        // Create context
2277        let n = 4;
2278        let threshold = quorum(n);
2279        let required_containers = 50;
2280        let activity_timeout = 10;
2281        let skip_timeout = 5;
2282        let namespace = b"consensus".to_vec();
2283        let cfg = deterministic::Config::new()
2284            .with_seed(seed)
2285            .with_timeout(Some(Duration::from_secs(30)));
2286        let executor = deterministic::Runner::new(cfg);
2287        executor.start(|mut context| async move {
2288            // Create simulated network
2289            let (network, mut oracle) = Network::new(
2290                context.with_label("network"),
2291                Config {
2292                    max_size: 1024 * 1024,
2293                },
2294            );
2295
2296            // Start network
2297            network.start();
2298
2299            // Register participants
2300            let mut schemes = Vec::new();
2301            let mut validators = Vec::new();
2302            for i in 0..n {
2303                let scheme = PrivateKey::from_seed(i as u64);
2304                let pk = scheme.public_key();
2305                schemes.push(scheme);
2306                validators.push(pk);
2307            }
2308            validators.sort();
2309            schemes.sort_by_key(|s| s.public_key());
2310            let mut registrations = register_validators(&mut oracle, &validators).await;
2311
2312            // Link all validators
2313            let link = Link {
2314                latency: 10.0,
2315                jitter: 1.0,
2316                success_rate: 1.0,
2317            };
2318            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2319
2320            // Derive threshold
2321            let (polynomial, shares) =
2322                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2323
2324            // Create engines
2325            let relay = Arc::new(mocks::relay::Relay::new());
2326            let mut supervisors = Vec::new();
2327            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2328                // Create scheme context
2329                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2330
2331                // Start engine
2332                let validator = scheme.public_key();
2333                let mut participants = BTreeMap::new();
2334                participants.insert(
2335                    0,
2336                    (
2337                        polynomial.clone(),
2338                        validators.clone(),
2339                        shares[idx_scheme].clone(),
2340                    ),
2341                );
2342                let supervisor_config = mocks::supervisor::Config::<_, V> {
2343                    namespace: namespace.clone(),
2344                    participants,
2345                };
2346                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2347                let (pending, recovered, resolver) = registrations
2348                    .remove(&validator)
2349                    .expect("validator should be registered");
2350                if idx_scheme == 0 {
2351                    let cfg = mocks::invalid::Config {
2352                        supervisor,
2353                        namespace: namespace.clone(),
2354                    };
2355
2356                    let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2357                        mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2358                    engine.start(pending);
2359                } else {
2360                    supervisors.push(supervisor.clone());
2361                    let application_cfg = mocks::application::Config {
2362                        hasher: Sha256::default(),
2363                        relay: relay.clone(),
2364                        participant: validator.clone(),
2365                        propose_latency: (10.0, 5.0),
2366                        verify_latency: (10.0, 5.0),
2367                    };
2368                    let (actor, application) = mocks::application::Application::new(
2369                        context.with_label("application"),
2370                        application_cfg,
2371                    );
2372                    actor.start();
2373                    let blocker = oracle.control(scheme.public_key());
2374                    let cfg = config::Config {
2375                        crypto: scheme,
2376                        blocker,
2377                        automaton: application.clone(),
2378                        relay: application.clone(),
2379                        reporter: supervisor.clone(),
2380                        supervisor,
2381                        partition: validator.to_string(),
2382                        compression: Some(3),
2383                        mailbox_size: 1024,
2384                        namespace: namespace.clone(),
2385                        leader_timeout: Duration::from_secs(1),
2386                        notarization_timeout: Duration::from_secs(2),
2387                        nullify_retry: Duration::from_secs(10),
2388                        fetch_timeout: Duration::from_secs(1),
2389                        activity_timeout,
2390                        skip_timeout,
2391                        max_fetch_count: 1,
2392                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2393                        fetch_concurrent: 1,
2394                        replay_buffer: 1024 * 1024,
2395                        write_buffer: 1024 * 1024,
2396                    };
2397                    let engine = Engine::new(context.with_label("engine"), cfg);
2398                    engine.start(pending, recovered, resolver);
2399                }
2400            }
2401
2402            // Wait for all engines to finish
2403            let mut finalizers = Vec::new();
2404            for supervisor in supervisors.iter_mut() {
2405                let (mut latest, mut monitor) = supervisor.subscribe().await;
2406                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2407                    while latest < required_containers {
2408                        latest = monitor.next().await.expect("event missing");
2409                    }
2410                }));
2411            }
2412            join_all(finalizers).await;
2413
2414            // Check supervisors for correct activity
2415            let mut invalid_count = 0;
2416            let byz = &validators[0];
2417            for supervisor in supervisors.iter() {
2418                // Ensure no faults
2419                {
2420                    let faults = supervisor.faults.lock().unwrap();
2421                    assert!(faults.is_empty());
2422                }
2423
2424                // Count invalid signatures
2425                {
2426                    let invalid = supervisor.invalid.lock().unwrap();
2427                    if *invalid > 0 {
2428                        invalid_count += 1;
2429                    }
2430                }
2431            }
2432            assert_eq!(invalid_count, n - 1);
2433
2434            // Ensure invalid is blocked
2435            let blocked = oracle.blocked().await.unwrap();
2436            assert!(!blocked.is_empty());
2437            for (a, b) in blocked {
2438                assert_ne!(&a, byz);
2439                assert_eq!(&b, byz);
2440            }
2441        });
2442    }
2443
2444    #[test_traced]
2445    #[ignore]
2446    fn test_invalid() {
2447        for seed in 0..5 {
2448            invalid::<MinPk>(seed);
2449            invalid::<MinSig>(seed);
2450        }
2451    }
2452
2453    fn impersonator<V: Variant>(seed: u64) {
2454        // Create context
2455        let n = 4;
2456        let threshold = quorum(n);
2457        let required_containers = 50;
2458        let activity_timeout = 10;
2459        let skip_timeout = 5;
2460        let namespace = b"consensus".to_vec();
2461        let cfg = deterministic::Config::new()
2462            .with_seed(seed)
2463            .with_timeout(Some(Duration::from_secs(30)));
2464        let executor = deterministic::Runner::new(cfg);
2465        executor.start(|mut context| async move {
2466            // Create simulated network
2467            let (network, mut oracle) = Network::new(
2468                context.with_label("network"),
2469                Config {
2470                    max_size: 1024 * 1024,
2471                },
2472            );
2473
2474            // Start network
2475            network.start();
2476
2477            // Register participants
2478            let mut schemes = Vec::new();
2479            let mut validators = Vec::new();
2480            for i in 0..n {
2481                let scheme = PrivateKey::from_seed(i as u64);
2482                let pk = scheme.public_key();
2483                schemes.push(scheme);
2484                validators.push(pk);
2485            }
2486            validators.sort();
2487            schemes.sort_by_key(|s| s.public_key());
2488            let mut registrations = register_validators(&mut oracle, &validators).await;
2489
2490            // Link all validators
2491            let link = Link {
2492                latency: 10.0,
2493                jitter: 1.0,
2494                success_rate: 1.0,
2495            };
2496            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2497
2498            // Derive threshold
2499            let (polynomial, shares) =
2500                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2501
2502            // Create engines
2503            let relay = Arc::new(mocks::relay::Relay::new());
2504            let mut supervisors = Vec::new();
2505            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2506                // Create scheme context
2507                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2508
2509                // Start engine
2510                let validator = scheme.public_key();
2511                let mut participants = BTreeMap::new();
2512                participants.insert(
2513                    0,
2514                    (
2515                        polynomial.clone(),
2516                        validators.clone(),
2517                        shares[idx_scheme].clone(),
2518                    ),
2519                );
2520                let supervisor_config = mocks::supervisor::Config::<_, V> {
2521                    namespace: namespace.clone(),
2522                    participants,
2523                };
2524                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2525                let (pending, recovered, resolver) = registrations
2526                    .remove(&validator)
2527                    .expect("validator should be registered");
2528                if idx_scheme == 0 {
2529                    let cfg = mocks::impersonator::Config {
2530                        supervisor,
2531                        namespace: namespace.clone(),
2532                    };
2533
2534                    let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2535                        mocks::impersonator::Impersonator::new(
2536                            context.with_label("byzantine_engine"),
2537                            cfg,
2538                        );
2539                    engine.start(pending);
2540                } else {
2541                    supervisors.push(supervisor.clone());
2542                    let application_cfg = mocks::application::Config {
2543                        hasher: Sha256::default(),
2544                        relay: relay.clone(),
2545                        participant: validator.clone(),
2546                        propose_latency: (10.0, 5.0),
2547                        verify_latency: (10.0, 5.0),
2548                    };
2549                    let (actor, application) = mocks::application::Application::new(
2550                        context.with_label("application"),
2551                        application_cfg,
2552                    );
2553                    actor.start();
2554                    let blocker = oracle.control(scheme.public_key());
2555                    let cfg = config::Config {
2556                        crypto: scheme,
2557                        blocker,
2558                        automaton: application.clone(),
2559                        relay: application.clone(),
2560                        reporter: supervisor.clone(),
2561                        supervisor,
2562                        partition: validator.to_string(),
2563                        compression: Some(3),
2564                        mailbox_size: 1024,
2565                        namespace: namespace.clone(),
2566                        leader_timeout: Duration::from_secs(1),
2567                        notarization_timeout: Duration::from_secs(2),
2568                        nullify_retry: Duration::from_secs(10),
2569                        fetch_timeout: Duration::from_secs(1),
2570                        activity_timeout,
2571                        skip_timeout,
2572                        max_fetch_count: 1,
2573                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2574                        fetch_concurrent: 1,
2575                        replay_buffer: 1024 * 1024,
2576                        write_buffer: 1024 * 1024,
2577                    };
2578                    let engine = Engine::new(context.with_label("engine"), cfg);
2579                    engine.start(pending, recovered, resolver);
2580                }
2581            }
2582
2583            // Wait for all engines to finish
2584            let mut finalizers = Vec::new();
2585            for supervisor in supervisors.iter_mut() {
2586                let (mut latest, mut monitor) = supervisor.subscribe().await;
2587                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2588                    while latest < required_containers {
2589                        latest = monitor.next().await.expect("event missing");
2590                    }
2591                }));
2592            }
2593            join_all(finalizers).await;
2594
2595            // Check supervisors for correct activity
2596            let byz = &validators[0];
2597            for supervisor in supervisors.iter() {
2598                // Ensure no faults
2599                {
2600                    let faults = supervisor.faults.lock().unwrap();
2601                    assert!(faults.is_empty());
2602                }
2603
2604                // Ensure no invalid signatures
2605                {
2606                    let invalid = supervisor.invalid.lock().unwrap();
2607                    assert_eq!(*invalid, 0);
2608                }
2609            }
2610
2611            // Ensure invalid is blocked
2612            let blocked = oracle.blocked().await.unwrap();
2613            assert!(!blocked.is_empty());
2614            for (a, b) in blocked {
2615                assert_ne!(&a, byz);
2616                assert_eq!(&b, byz);
2617            }
2618        });
2619    }
2620
2621    #[test_traced]
2622    #[ignore]
2623    fn test_impersonator() {
2624        for seed in 0..5 {
2625            impersonator::<MinPk>(seed);
2626            impersonator::<MinSig>(seed);
2627        }
2628    }
2629
2630    fn nuller<V: Variant>(seed: u64) {
2631        // Create context
2632        let n = 4;
2633        let threshold = quorum(n);
2634        let required_containers = 50;
2635        let activity_timeout = 10;
2636        let skip_timeout = 5;
2637        let namespace = b"consensus".to_vec();
2638        let cfg = deterministic::Config::new()
2639            .with_seed(seed)
2640            .with_timeout(Some(Duration::from_secs(30)));
2641        let executor = deterministic::Runner::new(cfg);
2642        executor.start(|mut context| async move {
2643            // Create simulated network
2644            let (network, mut oracle) = Network::new(
2645                context.with_label("network"),
2646                Config {
2647                    max_size: 1024 * 1024,
2648                },
2649            );
2650
2651            // Start network
2652            network.start();
2653
2654            // Register participants
2655            let mut schemes = Vec::new();
2656            let mut validators = Vec::new();
2657            for i in 0..n {
2658                let scheme = PrivateKey::from_seed(i as u64);
2659                let pk = scheme.public_key();
2660                schemes.push(scheme);
2661                validators.push(pk);
2662            }
2663            validators.sort();
2664            schemes.sort_by_key(|s| s.public_key());
2665            let mut registrations = register_validators(&mut oracle, &validators).await;
2666
2667            // Link all validators
2668            let link = Link {
2669                latency: 10.0,
2670                jitter: 1.0,
2671                success_rate: 1.0,
2672            };
2673            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2674
2675            // Derive threshold
2676            let (polynomial, shares) =
2677                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2678
2679            // Create engines
2680            let relay = Arc::new(mocks::relay::Relay::new());
2681            let mut supervisors = Vec::new();
2682            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2683                // Create scheme context
2684                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2685
2686                // Start engine
2687                let validator = scheme.public_key();
2688                let mut participants = BTreeMap::new();
2689                participants.insert(
2690                    0,
2691                    (
2692                        polynomial.clone(),
2693                        validators.clone(),
2694                        shares[idx_scheme].clone(),
2695                    ),
2696                );
2697                let supervisor_config = mocks::supervisor::Config::<_, V> {
2698                    namespace: namespace.clone(),
2699                    participants,
2700                };
2701                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2702                let (pending, recovered, resolver) = registrations
2703                    .remove(&validator)
2704                    .expect("validator should be registered");
2705                if idx_scheme == 0 {
2706                    let cfg = mocks::nuller::Config {
2707                        supervisor,
2708                        namespace: namespace.clone(),
2709                    };
2710                    let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2711                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2712                    engine.start(pending);
2713                } else {
2714                    supervisors.push(supervisor.clone());
2715                    let application_cfg = mocks::application::Config {
2716                        hasher: Sha256::default(),
2717                        relay: relay.clone(),
2718                        participant: validator.clone(),
2719                        propose_latency: (10.0, 5.0),
2720                        verify_latency: (10.0, 5.0),
2721                    };
2722                    let (actor, application) = mocks::application::Application::new(
2723                        context.with_label("application"),
2724                        application_cfg,
2725                    );
2726                    actor.start();
2727                    let blocker = oracle.control(scheme.public_key());
2728                    let cfg = config::Config {
2729                        crypto: scheme,
2730                        blocker,
2731                        automaton: application.clone(),
2732                        relay: application.clone(),
2733                        reporter: supervisor.clone(),
2734                        supervisor,
2735                        partition: validator.to_string(),
2736                        compression: Some(3),
2737                        mailbox_size: 1024,
2738                        namespace: namespace.clone(),
2739                        leader_timeout: Duration::from_secs(1),
2740                        notarization_timeout: Duration::from_secs(2),
2741                        nullify_retry: Duration::from_secs(10),
2742                        fetch_timeout: Duration::from_secs(1),
2743                        activity_timeout,
2744                        skip_timeout,
2745                        max_fetch_count: 1,
2746                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2747                        fetch_concurrent: 1,
2748                        replay_buffer: 1024 * 1024,
2749                        write_buffer: 1024 * 1024,
2750                    };
2751                    let engine = Engine::new(context.with_label("engine"), cfg);
2752                    engine.start(pending, recovered, resolver);
2753                }
2754            }
2755
2756            // Wait for all engines to finish
2757            let mut finalizers = Vec::new();
2758            for supervisor in supervisors.iter_mut() {
2759                let (mut latest, mut monitor) = supervisor.subscribe().await;
2760                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2761                    while latest < required_containers {
2762                        latest = monitor.next().await.expect("event missing");
2763                    }
2764                }));
2765            }
2766            join_all(finalizers).await;
2767
2768            // Check supervisors for correct activity
2769            let byz = &validators[0];
2770            let mut count_nullify_and_finalize = 0;
2771            for supervisor in supervisors.iter() {
2772                // Ensure only faults for byz
2773                {
2774                    let faults = supervisor.faults.lock().unwrap();
2775                    assert_eq!(faults.len(), 1);
2776                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2777                    for (_, faults) in faulter.iter() {
2778                        for fault in faults.iter() {
2779                            match fault {
2780                                Activity::NullifyFinalize(_) => {
2781                                    count_nullify_and_finalize += 1;
2782                                }
2783                                _ => panic!("unexpected fault: {fault:?}"),
2784                            }
2785                        }
2786                    }
2787                }
2788
2789                // Ensure no invalid signatures
2790                {
2791                    let invalid = supervisor.invalid.lock().unwrap();
2792                    assert_eq!(*invalid, 0);
2793                }
2794            }
2795            assert!(count_nullify_and_finalize > 0);
2796
2797            // Ensure nullifier is blocked
2798            let blocked = oracle.blocked().await.unwrap();
2799            assert!(!blocked.is_empty());
2800            for (a, b) in blocked {
2801                assert_ne!(&a, byz);
2802                assert_eq!(&b, byz);
2803            }
2804        });
2805    }
2806
2807    #[test_traced]
2808    #[ignore]
2809    fn test_nuller() {
2810        for seed in 0..5 {
2811            nuller::<MinPk>(seed);
2812            nuller::<MinSig>(seed);
2813        }
2814    }
2815
2816    fn outdated<V: Variant>(seed: u64) {
2817        // Create context
2818        let n = 4;
2819        let threshold = quorum(n);
2820        let required_containers = 100;
2821        let activity_timeout = 10;
2822        let skip_timeout = 5;
2823        let namespace = b"consensus".to_vec();
2824        let cfg = deterministic::Config::new()
2825            .with_seed(seed)
2826            .with_timeout(Some(Duration::from_secs(30)));
2827        let executor = deterministic::Runner::new(cfg);
2828        executor.start(|mut context| async move {
2829            // Create simulated network
2830            let (network, mut oracle) = Network::new(
2831                context.with_label("network"),
2832                Config {
2833                    max_size: 1024 * 1024,
2834                },
2835            );
2836
2837            // Start network
2838            network.start();
2839
2840            // Register participants
2841            let mut schemes = Vec::new();
2842            let mut validators = Vec::new();
2843            for i in 0..n {
2844                let scheme = PrivateKey::from_seed(i as u64);
2845                let pk = scheme.public_key();
2846                schemes.push(scheme);
2847                validators.push(pk);
2848            }
2849            validators.sort();
2850            schemes.sort_by_key(|s| s.public_key());
2851            let mut registrations = register_validators(&mut oracle, &validators).await;
2852
2853            // Link all validators
2854            let link = Link {
2855                latency: 10.0,
2856                jitter: 1.0,
2857                success_rate: 1.0,
2858            };
2859            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2860
2861            // Derive threshold
2862            let (polynomial, shares) =
2863                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2864
2865            // Create engines
2866            let relay = Arc::new(mocks::relay::Relay::new());
2867            let mut supervisors = Vec::new();
2868            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2869                // Create scheme context
2870                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2871
2872                // Start engine
2873                let validator = scheme.public_key();
2874                let mut participants = BTreeMap::new();
2875                participants.insert(
2876                    0,
2877                    (
2878                        polynomial.clone(),
2879                        validators.clone(),
2880                        shares[idx_scheme].clone(),
2881                    ),
2882                );
2883                let supervisor_config = mocks::supervisor::Config::<_, V> {
2884                    namespace: namespace.clone(),
2885                    participants,
2886                };
2887                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2888                let (pending, recovered, resolver) = registrations
2889                    .remove(&validator)
2890                    .expect("validator should be registered");
2891                if idx_scheme == 0 {
2892                    let cfg = mocks::outdated::Config {
2893                        supervisor,
2894                        namespace: namespace.clone(),
2895                        view_delta: activity_timeout * 4,
2896                    };
2897                    let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
2898                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2899                    engine.start(pending);
2900                } else {
2901                    supervisors.push(supervisor.clone());
2902                    let application_cfg = mocks::application::Config {
2903                        hasher: Sha256::default(),
2904                        relay: relay.clone(),
2905                        participant: validator.clone(),
2906                        propose_latency: (10.0, 5.0),
2907                        verify_latency: (10.0, 5.0),
2908                    };
2909                    let (actor, application) = mocks::application::Application::new(
2910                        context.with_label("application"),
2911                        application_cfg,
2912                    );
2913                    actor.start();
2914                    let blocker = oracle.control(scheme.public_key());
2915                    let cfg = config::Config {
2916                        crypto: scheme,
2917                        blocker,
2918                        automaton: application.clone(),
2919                        relay: application.clone(),
2920                        reporter: supervisor.clone(),
2921                        supervisor,
2922                        partition: validator.to_string(),
2923                        compression: Some(3),
2924                        mailbox_size: 1024,
2925                        namespace: namespace.clone(),
2926                        leader_timeout: Duration::from_secs(1),
2927                        notarization_timeout: Duration::from_secs(2),
2928                        nullify_retry: Duration::from_secs(10),
2929                        fetch_timeout: Duration::from_secs(1),
2930                        activity_timeout,
2931                        skip_timeout,
2932                        max_fetch_count: 1,
2933                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2934                        fetch_concurrent: 1,
2935                        replay_buffer: 1024 * 1024,
2936                        write_buffer: 1024 * 1024,
2937                    };
2938                    let engine = Engine::new(context.with_label("engine"), cfg);
2939                    engine.start(pending, recovered, resolver);
2940                }
2941            }
2942
2943            // Wait for all engines to finish
2944            let mut finalizers = Vec::new();
2945            for supervisor in supervisors.iter_mut() {
2946                let (mut latest, mut monitor) = supervisor.subscribe().await;
2947                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2948                    while latest < required_containers {
2949                        latest = monitor.next().await.expect("event missing");
2950                    }
2951                }));
2952            }
2953            join_all(finalizers).await;
2954
2955            // Check supervisors for correct activity
2956            for supervisor in supervisors.iter() {
2957                // Ensure no faults
2958                {
2959                    let faults = supervisor.faults.lock().unwrap();
2960                    assert!(faults.is_empty());
2961                }
2962
2963                // Ensure no invalid signatures
2964                {
2965                    let invalid = supervisor.invalid.lock().unwrap();
2966                    assert_eq!(*invalid, 0);
2967                }
2968            }
2969
2970            // Ensure no blocked connections
2971            let blocked = oracle.blocked().await.unwrap();
2972            assert!(blocked.is_empty());
2973        });
2974    }
2975
2976    #[test_traced]
2977    #[ignore]
2978    fn test_outdated() {
2979        for seed in 0..5 {
2980            outdated::<MinPk>(seed);
2981            outdated::<MinSig>(seed);
2982        }
2983    }
2984
2985    fn run_1k<V: Variant>() {
2986        // Create context
2987        let n = 10;
2988        let threshold = quorum(n);
2989        let required_containers = 1_000;
2990        let activity_timeout = 10;
2991        let skip_timeout = 5;
2992        let namespace = b"consensus".to_vec();
2993        let cfg = deterministic::Config::new();
2994        let executor = deterministic::Runner::new(cfg);
2995        executor.start(|mut context| async move {
2996            // Create simulated network
2997            let (network, mut oracle) = Network::new(
2998                context.with_label("network"),
2999                Config {
3000                    max_size: 1024 * 1024,
3001                },
3002            );
3003
3004            // Start network
3005            network.start();
3006
3007            // Register participants
3008            let mut schemes = Vec::new();
3009            let mut validators = Vec::new();
3010            for i in 0..n {
3011                let scheme = PrivateKey::from_seed(i as u64);
3012                let pk = scheme.public_key();
3013                schemes.push(scheme);
3014                validators.push(pk);
3015            }
3016            validators.sort();
3017            schemes.sort_by_key(|s| s.public_key());
3018            let mut registrations = register_validators(&mut oracle, &validators).await;
3019
3020            // Link all validators
3021            let link = Link {
3022                latency: 80.0,
3023                jitter: 10.0,
3024                success_rate: 0.98,
3025            };
3026            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3027
3028            // Derive threshold
3029            let (polynomial, shares) =
3030                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3031
3032            // Create engines
3033            let relay = Arc::new(mocks::relay::Relay::new());
3034            let mut supervisors = Vec::new();
3035            let mut engine_handlers = Vec::new();
3036            for (idx, scheme) in schemes.into_iter().enumerate() {
3037                // Create scheme context
3038                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3039
3040                // Configure engine
3041                let validator = scheme.public_key();
3042                let mut participants = BTreeMap::new();
3043                participants.insert(
3044                    0,
3045                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
3046                );
3047                let supervisor_config = mocks::supervisor::Config::<_, V> {
3048                    namespace: namespace.clone(),
3049                    participants,
3050                };
3051                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3052                supervisors.push(supervisor.clone());
3053                let application_cfg = mocks::application::Config {
3054                    hasher: Sha256::default(),
3055                    relay: relay.clone(),
3056                    participant: validator.clone(),
3057                    propose_latency: (100.0, 50.0),
3058                    verify_latency: (50.0, 40.0),
3059                };
3060                let (actor, application) = mocks::application::Application::new(
3061                    context.with_label("application"),
3062                    application_cfg,
3063                );
3064                actor.start();
3065                let blocker = oracle.control(scheme.public_key());
3066                let cfg = config::Config {
3067                    crypto: scheme,
3068                    blocker,
3069                    automaton: application.clone(),
3070                    relay: application.clone(),
3071                    reporter: supervisor.clone(),
3072                    supervisor,
3073                    partition: validator.to_string(),
3074                    compression: Some(3),
3075                    mailbox_size: 1024,
3076                    namespace: namespace.clone(),
3077                    leader_timeout: Duration::from_secs(1),
3078                    notarization_timeout: Duration::from_secs(2),
3079                    nullify_retry: Duration::from_secs(10),
3080                    fetch_timeout: Duration::from_secs(1),
3081                    activity_timeout,
3082                    skip_timeout,
3083                    max_fetch_count: 1,
3084                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3085                    fetch_concurrent: 1,
3086                    replay_buffer: 1024 * 1024,
3087                    write_buffer: 1024 * 1024,
3088                };
3089                let engine = Engine::new(context.with_label("engine"), cfg);
3090
3091                // Start engine
3092                let (pending, recovered, resolver) = registrations
3093                    .remove(&validator)
3094                    .expect("validator should be registered");
3095                engine_handlers.push(engine.start(pending, recovered, resolver));
3096            }
3097
3098            // Wait for all engines to finish
3099            let mut finalizers = Vec::new();
3100            for supervisor in supervisors.iter_mut() {
3101                let (mut latest, mut monitor) = supervisor.subscribe().await;
3102                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3103                    while latest < required_containers {
3104                        latest = monitor.next().await.expect("event missing");
3105                    }
3106                }));
3107            }
3108            join_all(finalizers).await;
3109
3110            // Check supervisors for correct activity
3111            for supervisor in supervisors.iter() {
3112                // Ensure no faults
3113                {
3114                    let faults = supervisor.faults.lock().unwrap();
3115                    assert!(faults.is_empty());
3116                }
3117
3118                // Ensure no invalid signatures
3119                {
3120                    let invalid = supervisor.invalid.lock().unwrap();
3121                    assert_eq!(*invalid, 0);
3122                }
3123            }
3124
3125            // Ensure no blocked connections
3126            let blocked = oracle.blocked().await.unwrap();
3127            assert!(blocked.is_empty());
3128        })
3129    }
3130
3131    #[test_traced]
3132    #[ignore]
3133    fn test_1k() {
3134        run_1k::<MinPk>();
3135        run_1k::<MinSig>();
3136    }
3137
3138    fn tle<V: Variant>() {
3139        // Create context
3140        let n = 4;
3141        let threshold = quorum(n);
3142        let namespace = b"consensus".to_vec();
3143        let activity_timeout = 100;
3144        let skip_timeout = 50;
3145        let executor = deterministic::Runner::timed(Duration::from_secs(30));
3146        executor.start(|mut context| async move {
3147            // Create simulated network
3148            let (network, mut oracle) = Network::new(
3149                context.with_label("network"),
3150                Config {
3151                    max_size: 1024 * 1024,
3152                },
3153            );
3154
3155            // Start network
3156            network.start();
3157
3158            // Register participants
3159            let mut schemes = Vec::new();
3160            let mut validators = Vec::new();
3161            for i in 0..n {
3162                let scheme = PrivateKey::from_seed(i as u64);
3163                let pk = scheme.public_key();
3164                schemes.push(scheme);
3165                validators.push(pk);
3166            }
3167            validators.sort();
3168            schemes.sort_by_key(|s| s.public_key());
3169            let mut registrations = register_validators(&mut oracle, &validators).await;
3170
3171            // Link all validators
3172            let link = Link {
3173                latency: 10.0,
3174                jitter: 5.0,
3175                success_rate: 1.0,
3176            };
3177            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3178
3179            // Derive threshold
3180            let (polynomial, shares) =
3181                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3182            let public_key = *public::<V>(&polynomial);
3183
3184            // Create engines and supervisors
3185            let relay = Arc::new(mocks::relay::Relay::new());
3186            let mut supervisors = Vec::new();
3187            let mut engine_handlers = Vec::new();
3188            let monitor_supervisor = Arc::new(Mutex::new(None));
3189            for (idx, scheme) in schemes.into_iter().enumerate() {
3190                // Create scheme context
3191                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3192
3193                // Configure engine
3194                let validator = scheme.public_key();
3195                let mut participants = BTreeMap::new();
3196                participants.insert(
3197                    0,
3198                    (polynomial.clone(), validators.clone(), shares[idx].clone()),
3199                );
3200
3201                // Store first supervisor for monitoring
3202                let supervisor_config = mocks::supervisor::Config::<_, V> {
3203                    namespace: namespace.clone(),
3204                    participants,
3205                };
3206                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3207                supervisors.push(supervisor.clone());
3208                if idx == 0 {
3209                    *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3210                }
3211
3212                // Configure application
3213                let application_cfg = mocks::application::Config {
3214                    hasher: Sha256::default(),
3215                    relay: relay.clone(),
3216                    participant: validator.clone(),
3217                    propose_latency: (10.0, 5.0),
3218                    verify_latency: (10.0, 5.0),
3219                };
3220                let (actor, application) = mocks::application::Application::new(
3221                    context.with_label("application"),
3222                    application_cfg,
3223                );
3224                actor.start();
3225                let blocker = oracle.control(scheme.public_key());
3226                let cfg = config::Config {
3227                    crypto: scheme,
3228                    blocker,
3229                    automaton: application.clone(),
3230                    relay: application.clone(),
3231                    reporter: supervisor.clone(),
3232                    supervisor,
3233                    partition: validator.to_string(),
3234                    compression: Some(3),
3235                    mailbox_size: 1024,
3236                    namespace: namespace.clone(),
3237                    leader_timeout: Duration::from_millis(100),
3238                    notarization_timeout: Duration::from_millis(200),
3239                    nullify_retry: Duration::from_millis(500),
3240                    fetch_timeout: Duration::from_millis(100),
3241                    activity_timeout,
3242                    skip_timeout,
3243                    max_fetch_count: 1,
3244                    fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3245                    fetch_concurrent: 1,
3246                    replay_buffer: 1024 * 1024,
3247                    write_buffer: 1024 * 1024,
3248                };
3249                let engine = Engine::new(context.with_label("engine"), cfg);
3250
3251                // Start engine
3252                let (pending, recovered, resolver) = registrations
3253                    .remove(&validator)
3254                    .expect("validator should be registered");
3255                engine_handlers.push(engine.start(pending, recovered, resolver));
3256            }
3257
3258            // Prepare TLE test data
3259            let target = 10u64; // Encrypt for view 10
3260            let target_bytes = target.to_be_bytes();
3261            let message_content = b"Secret message for future view10"; // 32 bytes
3262            let message = Block::new(*message_content);
3263
3264            // Encrypt message for future view using threshold public key
3265            let seed_namespace = seed_namespace(&namespace);
3266            let ciphertext = encrypt::<_, V>(
3267                &mut context,
3268                public_key,
3269                (Some(&seed_namespace), &target_bytes),
3270                &message,
3271            );
3272
3273            // Wait for consensus to reach the target view and then decrypt
3274            let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3275            loop {
3276                // Wait for notarization
3277                context.sleep(Duration::from_millis(100)).await;
3278                let notarizations = supervisor.notarizations.lock().unwrap();
3279                let Some(notarization) = notarizations.get(&target) else {
3280                    continue;
3281                };
3282
3283                // Decrypt the message using the seed signature
3284                let seed_signature = notarization.seed_signature;
3285                let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3286                    .expect("Decryption should succeed with valid seed signature");
3287                assert_eq!(
3288                    message.as_ref(),
3289                    decrypted.as_ref(),
3290                    "Decrypted message should match original message"
3291                );
3292                break;
3293            }
3294        });
3295    }
3296
3297    #[test_traced]
3298    fn test_tle() {
3299        tle::<MinPk>();
3300        tle::<MinSig>();
3301    }
3302}