commonware_consensus/threshold_simplex/
mod.rs

1//! [Simplex](crate::simplex)-like BFT agreement with an embedded VRF and succinct consensus certificates.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `threshold-simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal finalization
5//! latency in a partially synchronous setting. Unlike Simplex Consensus, however, `threshold-simplex` employs threshold
6//! cryptography (specifically BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum) to generate both
7//! a bias-resistant beacon (for leader election and post-facto execution randomness) and succinct consensus certificates
8//! (any certificate can be verified with just the static public key of the consensus instance) for each view
9//! with zero message overhead (natively integrated).
10//!
11//! _If you wish to deploy Simplex Consensus but can't employ threshold signatures, see
12//! [crate::simplex]._
13//!
14//! # Features
15//!
16//! * Wicked Fast Block Times (2 Network Hops)
17//! * Optimal Finalization Latency (3 Network Hops)
18//! * Externalized Uptime and Fault Proofs
19//! * Decoupled Block Broadcast and Sync
20//! * Lazy Message Verification
21//! * Flexible Block Format
22//! * Embedded VRF for Leader Election and Post-Facto Execution Randomness
23//! * Succinct Consensus Certificates for Notarization, Nullification, and Finality
24//!
25//! # Design
26//!
27//! ## Architecture
28//!
29//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
30//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
31//! is responsible for directing participation in the current view. Lastly, the `Resolver` is responsible for
32//! fetching artifacts from previous views required to verify proposed blocks in the latest view.
33//!
34//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
35//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
36//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
37//!
38//! ```txt
39//!                            +------------+          +++++++++++++++
40//!                            |            +--------->+             +
41//!                            |  Batcher   |          +    Peers    +
42//!                            |            |<---------+             +
43//!                            +-------+----+          +++++++++++++++
44//!                                |   ^
45//!                                |   |
46//!                                |   |
47//!                                |   |
48//!                                v   |
49//! +---------------+           +---------+            +++++++++++++++
50//! |               |<----------+         +----------->+             +
51//! |  Application  |           |  Voter  |            +    Peers    +
52//! |               +---------->|         |<-----------+             +
53//! +---------------+           +--+------+            +++++++++++++++
54//!                                |   ^
55//!                                |   |
56//!                                |   |
57//!                                |   |
58//!                                v   |
59//!                            +-------+----+          +++++++++++++++
60//!                            |            +--------->+             +
61//!                            |  Resolver  |          +    Peers    +
62//!                            |            |<---------+             +
63//!                            +------------+          +++++++++++++++
64//! ```
65//!
66//! ## Joining Consensus
67//!
68//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
69//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
70//! latest view and begin participating in consensus (assuming it can verify blocks).
71//!
72//! ## Persistence
73//!
74//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
75//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
76//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
77//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
78//! on restart (especially in the case of unclean shutdown).
79//!
80//! ## Batched Verification
81//!
82//! Unlike other consensus constructions that verify all incoming messages received from peers,
83//! `threshold-simplex` lazily verifies messages (only when a quorum is met). If an invalid signature
84//! is detected, the `Batcher` will perform repeated bisections over collected messages to find the
85//! offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
86//!
87//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
88//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
89//!
90//! ## Protocol Description
91//!
92//! ### Specification for View `v`
93//!
94//! Upon entering view `v`:
95//! * Determine leader `l` for view `v`
96//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
97//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
98//! * If leader `l`, broadcast `(part(v), notarize(c,v))`
99//!   * If can't propose container in view `v` because missing notarization/nullification for a
100//!     previous view `v_m`, request `v_m`
101//!
102//! Upon receiving first `(part(v), notarize(c,v))` from `l`:
103//! * Cancel `t_l`
104//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
105//!   between `v` and `v_parent`, verify `c` and broadcast `(part(v), notarize(c,v))`
106//!
107//! Upon receiving `2f+1` `(part(v), notarize(c,v))`:
108//! * Cancel `t_a`
109//! * Mark `c` as notarized
110//! * Broadcast `(seed(v), notarization(c,v))` (even if we have not verified `c`)
111//! * If have not broadcast `(part(v), nullify(v))`, broadcast `finalize(c,v)`
112//! * Enter `v+1`
113//!
114//! Upon receiving `2f+1` `(part(v), nullify(v))`:
115//! * Broadcast `(seed(v), nullification(v))`
116//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
117//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
118//!       instead.
119//! * Enter `v+1`
120//!
121//! Upon receiving `2f+1` `finalize(c,v)`:
122//! * Mark `c` as finalized (and recursively finalize its parents)
123//! * Broadcast `(seed(v), finalization(c,v))` (even if we have not verified `c`)
124//!
125//! Upon `t_l` or `t_a` firing:
126//! * Broadcast `(part(v), nullify(v))`
127//! * Every `t_r` after `(part(v), nullify(v))` broadcast that we are still in view `v`:
128//!    * Rebroadcast `(part(v), nullify(v))` and either `(seed(v-1), notarization(v-1))` or `(seed(v-1), nullification(v-1))`
129//!
130//! #### Embedded VRF
131//!
132//! When broadcasting any `notarize(c,v)` or `nullify(v)` message, a participant must also include a `part(v)` message (a partial
133//! signature over the view `v`). After `2f+1` `notarize(c,v)` or `nullify(v)` messages are collected from unique participants,
134//! `seed(v)` can be recovered. Because `part(v)` is only over the view `v`, the seed derived for a given view `v` is the same regardless of
135//! whether or not a block was notarized in said view `v`.
136//!
137//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
138//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
139//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
140//! is used as a seed in `v`).
141//!
142//! #### Succinct Consensus Certificates
143//!
144//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain partial signatures for a static
145//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
146//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
147//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
148//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
149//! to operate a lite client).
150//!
151//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
152//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
153//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
154//!
155//! ### Deviations from Simplex Consensus
156//!
157//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
158//!   a set of all notarizations/nullifications for all historical blocks.
159//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
160//!   either a "block" or a "dummy block", respectively.
161//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
162//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
163//!   some number of views (again to trigger early view transition for an unresponsive leader).
164//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
165//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
166
167use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172    if #[cfg(not(target_arch = "wasm32"))] {
173        mod actors;
174        mod config;
175        pub use config::Config;
176        mod engine;
177        pub use engine::Engine;
178        mod metrics;
179    }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185/// The minimum view we are tracking both in-memory and on-disk.
186pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187    last_finalized.saturating_sub(activity_timeout)
188}
189
190/// Whether or not a view is interesting to us. This is a function
191/// of both `min_active` and whether or not the view is too far
192/// in the future (based on the view we are currently in).
193pub(crate) fn interesting(
194    activity_timeout: View,
195    last_finalized: View,
196    current: View,
197    pending: View,
198    allow_future: bool,
199) -> bool {
200    if pending < min_active(activity_timeout, last_finalized) {
201        return false;
202    }
203    if !allow_future && pending > current + 1 {
204        return false;
205    }
206    true
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::{threshold_simplex::types::seed_namespace, Monitor};
213    use commonware_cryptography::{
214        bls12381::{
215            dkg::ops,
216            primitives::{
217                poly::public,
218                variant::{MinPk, MinSig, Variant},
219            },
220            tle::{decrypt, encrypt, Block},
221        },
222        ed25519::PrivateKey,
223        PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224    };
225    use commonware_macros::{select, test_traced};
226    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
228    use commonware_utils::{quorum, NZUsize, NZU32};
229    use engine::Engine;
230    use futures::{future::join_all, StreamExt};
231    use governor::Quota;
232    use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233    use std::{
234        collections::{BTreeMap, HashMap},
235        num::NonZeroUsize,
236        sync::{Arc, Mutex},
237        time::Duration,
238    };
239    use tracing::{debug, warn};
240    use types::Activity;
241
242    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
243    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
244
245    /// Registers all validators using the oracle.
246    async fn register_validators<P: PublicKey>(
247        oracle: &mut Oracle<P>,
248        validators: &[P],
249    ) -> HashMap<
250        P,
251        (
252            (Sender<P>, Receiver<P>),
253            (Sender<P>, Receiver<P>),
254            (Sender<P>, Receiver<P>),
255        ),
256    > {
257        let mut registrations = HashMap::new();
258        for validator in validators.iter() {
259            let (pending_sender, pending_receiver) =
260                oracle.register(validator.clone(), 0).await.unwrap();
261            let (recovered_sender, recovered_receiver) =
262                oracle.register(validator.clone(), 1).await.unwrap();
263            let (resolver_sender, resolver_receiver) =
264                oracle.register(validator.clone(), 2).await.unwrap();
265            registrations.insert(
266                validator.clone(),
267                (
268                    (pending_sender, pending_receiver),
269                    (recovered_sender, recovered_receiver),
270                    (resolver_sender, resolver_receiver),
271                ),
272            );
273        }
274        registrations
275    }
276
277    /// Enum to describe the action to take when linking validators.
278    enum Action {
279        Link(Link),
280        Update(Link), // Unlink and then link
281        Unlink,
282    }
283
284    /// Links (or unlinks) validators using the oracle.
285    ///
286    /// The `action` parameter determines the action (e.g. link, unlink) to take.
287    /// The `restrict_to` function can be used to restrict the linking to certain connections,
288    /// otherwise all validators will be linked to all other validators.
289    async fn link_validators<P: PublicKey>(
290        oracle: &mut Oracle<P>,
291        validators: &[P],
292        action: Action,
293        restrict_to: Option<fn(usize, usize, usize) -> bool>,
294    ) {
295        for (i1, v1) in validators.iter().enumerate() {
296            for (i2, v2) in validators.iter().enumerate() {
297                // Ignore self
298                if v2 == v1 {
299                    continue;
300                }
301
302                // Restrict to certain connections
303                if let Some(f) = restrict_to {
304                    if !f(validators.len(), i1, i2) {
305                        continue;
306                    }
307                }
308
309                // Do any unlinking first
310                match action {
311                    Action::Update(_) | Action::Unlink => {
312                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
313                    }
314                    _ => {}
315                }
316
317                // Do any linking after
318                match action {
319                    Action::Link(ref link) | Action::Update(ref link) => {
320                        oracle
321                            .add_link(v1.clone(), v2.clone(), link.clone())
322                            .await
323                            .unwrap();
324                    }
325                    _ => {}
326                }
327            }
328        }
329    }
330
331    fn all_online<V: Variant>() {
332        // Create context
333        let n = 5;
334        let threshold = quorum(n);
335        let required_containers = 100;
336        let activity_timeout = 10;
337        let skip_timeout = 5;
338        let namespace = b"consensus".to_vec();
339        let executor = deterministic::Runner::timed(Duration::from_secs(30));
340        executor.start(|mut context| async move {
341            // Create simulated network
342            let (network, mut oracle) = Network::new(
343                context.with_label("network"),
344                Config {
345                    max_size: 1024 * 1024,
346                },
347            );
348
349            // Start network
350            network.start();
351
352            // Register participants
353            let mut schemes = Vec::new();
354            let mut validators = Vec::new();
355            for i in 0..n {
356                let scheme = PrivateKey::from_seed(i as u64);
357                let pk = scheme.public_key();
358                schemes.push(scheme);
359                validators.push(pk);
360            }
361            validators.sort();
362            schemes.sort_by_key(|s| s.public_key());
363            let mut registrations = register_validators(&mut oracle, &validators).await;
364
365            // Link all validators
366            let link = Link {
367                latency: Duration::from_millis(10),
368                jitter: Duration::from_millis(1),
369                success_rate: 1.0,
370            };
371            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
372
373            // Derive threshold
374            let (polynomial, shares) =
375                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
376
377            // Create engines
378            let relay = Arc::new(mocks::relay::Relay::new());
379            let mut supervisors = Vec::new();
380            let mut engine_handlers = Vec::new();
381            for (idx, scheme) in schemes.into_iter().enumerate() {
382                // Create scheme context
383                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
384
385                // Configure engine
386                let validator = scheme.public_key();
387                let mut participants = BTreeMap::new();
388                participants.insert(
389                    0,
390                    (
391                        polynomial.clone(),
392                        validators.clone(),
393                        Some(shares[idx].clone()),
394                    ),
395                );
396                let supervisor_config = mocks::supervisor::Config::<_, V> {
397                    namespace: namespace.clone(),
398                    participants,
399                };
400                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
401                supervisors.push(supervisor.clone());
402                let application_cfg = mocks::application::Config {
403                    hasher: Sha256::default(),
404                    relay: relay.clone(),
405                    participant: validator.clone(),
406                    propose_latency: (10.0, 5.0),
407                    verify_latency: (10.0, 5.0),
408                };
409                let (actor, application) = mocks::application::Application::new(
410                    context.with_label("application"),
411                    application_cfg,
412                );
413                actor.start();
414                let blocker = oracle.control(scheme.public_key());
415                let cfg = config::Config {
416                    crypto: scheme,
417                    blocker,
418                    automaton: application.clone(),
419                    relay: application.clone(),
420                    reporter: supervisor.clone(),
421                    supervisor,
422                    partition: validator.to_string(),
423                    mailbox_size: 1024,
424                    namespace: namespace.clone(),
425                    leader_timeout: Duration::from_secs(1),
426                    notarization_timeout: Duration::from_secs(2),
427                    nullify_retry: Duration::from_secs(10),
428                    fetch_timeout: Duration::from_secs(1),
429                    activity_timeout,
430                    skip_timeout,
431                    max_fetch_count: 1,
432                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
433                    fetch_concurrent: 1,
434                    replay_buffer: NZUsize!(1024 * 1024),
435                    write_buffer: NZUsize!(1024 * 1024),
436                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
437                };
438                let engine = Engine::new(context.with_label("engine"), cfg);
439
440                // Start engine
441                let (pending, recovered, resolver) = registrations
442                    .remove(&validator)
443                    .expect("validator should be registered");
444                engine_handlers.push(engine.start(pending, recovered, resolver));
445            }
446
447            // Wait for all engines to finish
448            let mut finalizers = Vec::new();
449            for supervisor in supervisors.iter_mut() {
450                let (mut latest, mut monitor) = supervisor.subscribe().await;
451                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
452                    while latest < required_containers {
453                        latest = monitor.next().await.expect("event missing");
454                    }
455                }));
456            }
457            join_all(finalizers).await;
458
459            // Check supervisors for correct activity
460            let latest_complete = required_containers - activity_timeout;
461            for supervisor in supervisors.iter() {
462                // Ensure no faults
463                {
464                    let faults = supervisor.faults.lock().unwrap();
465                    assert!(faults.is_empty());
466                }
467
468                // Ensure no invalid signatures
469                {
470                    let invalid = supervisor.invalid.lock().unwrap();
471                    assert_eq!(*invalid, 0);
472                }
473
474                // Ensure seeds for all views
475                {
476                    let seeds = supervisor.seeds.lock().unwrap();
477                    for view in 1..latest_complete {
478                        // Ensure seed for every view
479                        if !seeds.contains_key(&view) {
480                            panic!("view: {view}");
481                        }
482                    }
483                }
484
485                // Ensure no forks
486                let mut notarized = HashMap::new();
487                let mut finalized = HashMap::new();
488                {
489                    let notarizes = supervisor.notarizes.lock().unwrap();
490                    for view in 1..latest_complete {
491                        // Ensure only one payload proposed per view
492                        let Some(payloads) = notarizes.get(&view) else {
493                            continue;
494                        };
495                        if payloads.len() > 1 {
496                            panic!("view: {view}");
497                        }
498                        let (digest, notarizers) = payloads.iter().next().unwrap();
499                        notarized.insert(view, *digest);
500
501                        if notarizers.len() < threshold as usize {
502                            // We can't verify that everyone participated at every view because some nodes may
503                            // have started later.
504                            panic!("view: {view}");
505                        }
506                    }
507                }
508                {
509                    let notarizations = supervisor.notarizations.lock().unwrap();
510                    for view in 1..latest_complete {
511                        // Ensure notarization matches digest from notarizes
512                        let Some(notarization) = notarizations.get(&view) else {
513                            continue;
514                        };
515                        let Some(digest) = notarized.get(&view) else {
516                            continue;
517                        };
518                        assert_eq!(&notarization.proposal.payload, digest);
519                    }
520                }
521                {
522                    let finalizes = supervisor.finalizes.lock().unwrap();
523                    for view in 1..latest_complete {
524                        // Ensure only one payload proposed per view
525                        let Some(payloads) = finalizes.get(&view) else {
526                            continue;
527                        };
528                        if payloads.len() > 1 {
529                            panic!("view: {view}");
530                        }
531                        let (digest, finalizers) = payloads.iter().next().unwrap();
532                        finalized.insert(view, *digest);
533
534                        // Only check at views below timeout
535                        if view > latest_complete {
536                            continue;
537                        }
538
539                        // Ensure everyone participating
540                        if finalizers.len() < threshold as usize {
541                            // We can't verify that everyone participated at every view because some nodes may
542                            // have started later.
543                            panic!("view: {view}");
544                        }
545
546                        // Ensure no nullifies for any finalizers
547                        let nullifies = supervisor.nullifies.lock().unwrap();
548                        let Some(nullifies) = nullifies.get(&view) else {
549                            continue;
550                        };
551                        for (_, finalizers) in payloads.iter() {
552                            for finalizer in finalizers.iter() {
553                                if nullifies.contains(finalizer) {
554                                    panic!("should not nullify and finalize at same view");
555                                }
556                            }
557                        }
558                    }
559                }
560                {
561                    let finalizations = supervisor.finalizations.lock().unwrap();
562                    for view in 1..latest_complete {
563                        // Ensure finalization matches digest from finalizes
564                        let Some(finalization) = finalizations.get(&view) else {
565                            continue;
566                        };
567                        let Some(digest) = finalized.get(&view) else {
568                            continue;
569                        };
570                        assert_eq!(&finalization.proposal.payload, digest);
571                    }
572                }
573            }
574
575            // Ensure no blocked connections
576            let blocked = oracle.blocked().await.unwrap();
577            assert!(blocked.is_empty());
578        });
579    }
580
581    #[test_traced]
582    fn test_all_online() {
583        all_online::<MinPk>();
584        all_online::<MinSig>();
585    }
586
587    fn observer<V: Variant>() {
588        // Create context
589        let n_active = 5;
590        let threshold = quorum(n_active);
591        let required_containers = 100;
592        let activity_timeout = 10;
593        let skip_timeout = 5;
594        let namespace = b"consensus".to_vec();
595        let executor = deterministic::Runner::timed(Duration::from_secs(30));
596        executor.start(|mut context| async move {
597            // Create simulated network
598            let (network, mut oracle) = Network::new(
599                context.with_label("network"),
600                Config {
601                    max_size: 1024 * 1024,
602                },
603            );
604
605            // Start network
606            network.start();
607
608            // Register participants (active)
609            let mut schemes = Vec::new();
610            let mut validators = Vec::new();
611            for i in 0..n_active {
612                let scheme = PrivateKey::from_seed(i as u64);
613                let pk = scheme.public_key();
614                schemes.push(scheme);
615                validators.push(pk);
616            }
617            schemes.sort_by_key(|s| s.public_key());
618            validators.sort();
619
620            // Add observer (no share)
621            let scheme_observer = PrivateKey::from_seed(n_active as u64);
622            let pk_observer = scheme_observer.public_key();
623            schemes.push(scheme_observer);
624
625            // Register all (including observer) with the network
626            let mut all_validators = validators.clone();
627            all_validators.push(pk_observer.clone());
628            all_validators.sort();
629            let mut registrations = register_validators(&mut oracle, &all_validators).await;
630
631            // Link all peers (including observer)
632            let link = Link {
633                latency: Duration::from_millis(10),
634                jitter: Duration::from_millis(1),
635                success_rate: 1.0,
636            };
637            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
638
639            // Derive threshold
640            let (polynomial, shares) =
641                ops::generate_shares::<_, V>(&mut context, None, n_active, threshold);
642
643            // Create engines
644            let relay = Arc::new(mocks::relay::Relay::new());
645            let mut supervisors = Vec::new();
646
647            for (idx, scheme) in schemes.into_iter().enumerate() {
648                let is_observer = scheme.public_key() == pk_observer;
649
650                // Create scheme context
651                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
652
653                // Configure engine
654                let validator = scheme.public_key();
655                let mut participants = BTreeMap::new();
656                let share = if is_observer {
657                    None
658                } else {
659                    Some(shares[idx].clone())
660                };
661                participants.insert(0, (polynomial.clone(), validators.clone(), share));
662                let supervisor_config = mocks::supervisor::Config::<_, V> {
663                    namespace: namespace.clone(),
664                    participants,
665                };
666                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
667                supervisors.push(supervisor.clone());
668                let application_cfg = mocks::application::Config {
669                    hasher: Sha256::default(),
670                    relay: relay.clone(),
671                    participant: validator.clone(),
672                    propose_latency: (10.0, 5.0),
673                    verify_latency: (10.0, 5.0),
674                };
675                let (actor, application) = mocks::application::Application::new(
676                    context.with_label("application"),
677                    application_cfg,
678                );
679                actor.start();
680                let blocker = oracle.control(validator.clone());
681                let cfg = config::Config {
682                    crypto: scheme,
683                    blocker,
684                    automaton: application.clone(),
685                    relay: application.clone(),
686                    reporter: supervisor.clone(),
687                    supervisor,
688                    partition: validator.to_string(),
689                    mailbox_size: 1024,
690                    namespace: namespace.clone(),
691                    leader_timeout: Duration::from_secs(1),
692                    notarization_timeout: Duration::from_secs(2),
693                    nullify_retry: Duration::from_secs(10),
694                    fetch_timeout: Duration::from_secs(1),
695                    activity_timeout,
696                    skip_timeout,
697                    max_fetch_count: 1,
698                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
699                    fetch_concurrent: 1,
700                    replay_buffer: NZUsize!(1024 * 1024),
701                    write_buffer: NZUsize!(1024 * 1024),
702                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
703                };
704                let engine = Engine::new(context.with_label("engine"), cfg);
705
706                // Start engine
707                let (pending, recovered, resolver) = registrations
708                    .remove(&validator)
709                    .expect("validator should be registered");
710                engine.start(pending, recovered, resolver);
711            }
712
713            // Wait for all  engines to finish
714            let mut finalizers = Vec::new();
715            for supervisor in supervisors.iter_mut() {
716                let (mut latest, mut monitor) = supervisor.subscribe().await;
717                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
718                    while latest < required_containers {
719                        latest = monitor.next().await.expect("event missing");
720                    }
721                }));
722            }
723            join_all(finalizers).await;
724
725            // Sanity check
726            for supervisor in supervisors.iter() {
727                // Ensure no faults or invalid signatures
728                {
729                    let faults = supervisor.faults.lock().unwrap();
730                    assert!(faults.is_empty());
731                }
732                {
733                    let invalid = supervisor.invalid.lock().unwrap();
734                    assert_eq!(*invalid, 0);
735                }
736
737                // Ensure no blocked connections
738                let blocked = oracle.blocked().await.unwrap();
739                assert!(blocked.is_empty());
740            }
741        });
742    }
743
744    #[test_traced]
745    fn test_observer() {
746        observer::<MinPk>();
747        observer::<MinSig>();
748    }
749
750    fn unclean_shutdown<V: Variant>() {
751        // Create context
752        let n = 5;
753        let threshold = quorum(n);
754        let required_containers = 100;
755        let activity_timeout = 10;
756        let skip_timeout = 5;
757        let namespace = b"consensus".to_vec();
758
759        // Derive threshold
760        let mut rng = StdRng::seed_from_u64(0);
761        let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
762
763        // Random restarts every x seconds
764        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
765        let supervised = Arc::new(Mutex::new(Vec::new()));
766        let mut prev_ctx = None;
767
768        loop {
769            let namespace = namespace.clone();
770            let shutdowns = shutdowns.clone();
771            let supervised = supervised.clone();
772            let polynomial = polynomial.clone();
773            let shares = shares.clone();
774
775            let f = |mut context: deterministic::Context| async move {
776                // Create simulated network
777                let (network, mut oracle) = Network::new(
778                    context.with_label("network"),
779                    Config {
780                        max_size: 1024 * 1024,
781                    },
782                );
783
784                // Start network
785                network.start();
786
787                // Register participants
788                let mut schemes = Vec::new();
789                let mut validators = Vec::new();
790                for i in 0..n {
791                    let scheme = PrivateKey::from_seed(i as u64);
792                    let pk = scheme.public_key();
793                    schemes.push(scheme);
794                    validators.push(pk);
795                }
796                validators.sort();
797                schemes.sort_by_key(|s| s.public_key());
798                let mut registrations = register_validators(&mut oracle, &validators).await;
799
800                // Link all validators
801                let link = Link {
802                    latency: Duration::from_millis(50),
803                    jitter: Duration::from_millis(50),
804                    success_rate: 1.0,
805                };
806                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
807
808                // Create engines
809                let relay = Arc::new(mocks::relay::Relay::new());
810                let mut supervisors = HashMap::new();
811                let mut engine_handlers = Vec::new();
812                for (idx, scheme) in schemes.into_iter().enumerate() {
813                    // Create scheme context
814                    let context = context
815                        .clone()
816                        .with_label(&format!("validator-{}", scheme.public_key()));
817
818                    // Configure engine
819                    let validator = scheme.public_key();
820                    let mut participants = BTreeMap::new();
821                    participants.insert(
822                        0,
823                        (
824                            polynomial.clone(),
825                            validators.clone(),
826                            Some(shares[idx].clone()),
827                        ),
828                    );
829                    let supervisor_config = mocks::supervisor::Config::<_, V> {
830                        namespace: namespace.clone(),
831                        participants,
832                    };
833                    let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
834                    supervisors.insert(validator.clone(), supervisor.clone());
835                    let application_cfg = mocks::application::Config {
836                        hasher: Sha256::default(),
837                        relay: relay.clone(),
838                        participant: validator.clone(),
839                        propose_latency: (10.0, 5.0),
840                        verify_latency: (10.0, 5.0),
841                    };
842                    let (actor, application) = mocks::application::Application::new(
843                        context.with_label("application"),
844                        application_cfg,
845                    );
846                    actor.start();
847                    let blocker = oracle.control(scheme.public_key());
848                    let cfg = config::Config {
849                        crypto: scheme,
850                        blocker,
851                        automaton: application.clone(),
852                        relay: application.clone(),
853                        reporter: supervisor.clone(),
854                        supervisor,
855                        partition: validator.to_string(),
856                        mailbox_size: 1024,
857                        namespace: namespace.clone(),
858                        leader_timeout: Duration::from_secs(1),
859                        notarization_timeout: Duration::from_secs(2),
860                        nullify_retry: Duration::from_secs(10),
861                        fetch_timeout: Duration::from_secs(1),
862                        activity_timeout,
863                        skip_timeout,
864                        max_fetch_count: 1,
865                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
866                        fetch_concurrent: 1,
867                        replay_buffer: NZUsize!(1024 * 1024),
868                        write_buffer: NZUsize!(1024 * 1024),
869                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870                    };
871                    let engine = Engine::new(context.with_label("engine"), cfg);
872
873                    // Start engine
874                    let (pending, recovered, resolver) = registrations
875                        .remove(&validator)
876                        .expect("validator should be registered");
877                    engine_handlers.push(engine.start(pending, recovered, resolver));
878                }
879
880                // Store all finalizer handles
881                let mut finalizers = Vec::new();
882                for (_, supervisor) in supervisors.iter_mut() {
883                    let (mut latest, mut monitor) = supervisor.subscribe().await;
884                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
885                        while latest < required_containers {
886                            latest = monitor.next().await.expect("event missing");
887                        }
888                    }));
889                }
890
891                // Exit at random points for unclean shutdown of entire set
892                let wait =
893                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
894                let result = select! {
895                    _ = context.sleep(wait) => {
896                        // Collect supervisors to check faults
897                        {
898                            let mut shutdowns = shutdowns.lock().unwrap();
899                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
900                            *shutdowns += 1;
901                        }
902                        supervised.lock().unwrap().push(supervisors);
903                        (false,context)
904                    },
905                    _ = join_all(finalizers) => {
906                        // Check supervisors for faults activity
907                        let supervised = supervised.lock().unwrap();
908                        for supervisors in supervised.iter() {
909                            for (_, supervisor) in supervisors.iter() {
910                                let faults = supervisor.faults.lock().unwrap();
911                                assert!(faults.is_empty());
912                            }
913                        }
914                        (true,context)
915                    }
916                };
917
918                // Ensure no blocked connections
919                let blocked = oracle.blocked().await.unwrap();
920                assert!(blocked.is_empty());
921
922                result
923            };
924
925            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
926                deterministic::Runner::from(prev_ctx)
927            } else {
928                deterministic::Runner::timed(Duration::from_secs(30))
929            }
930            .start(f);
931
932            // Check if we should exit
933            if complete {
934                break;
935            }
936
937            prev_ctx = Some(context.recover());
938        }
939    }
940
941    #[test_traced]
942    fn test_unclean_shutdown() {
943        unclean_shutdown::<MinPk>();
944        unclean_shutdown::<MinSig>();
945    }
946
947    fn backfill<V: Variant>() {
948        // Create context
949        let n = 4;
950        let threshold = quorum(n);
951        let required_containers = 100;
952        let activity_timeout = 10;
953        let skip_timeout = 5;
954        let namespace = b"consensus".to_vec();
955        let executor = deterministic::Runner::timed(Duration::from_secs(720));
956        executor.start(|mut context| async move {
957            // Create simulated network
958            let (network, mut oracle) = Network::new(
959                context.with_label("network"),
960                Config {
961                    max_size: 1024 * 1024,
962                },
963            );
964
965            // Start network
966            network.start();
967
968            // Register participants
969            let mut schemes = Vec::new();
970            let mut validators = Vec::new();
971            for i in 0..n {
972                let scheme = PrivateKey::from_seed(i as u64);
973                let pk = scheme.public_key();
974                schemes.push(scheme);
975                validators.push(pk);
976            }
977            validators.sort();
978            schemes.sort_by_key(|s| s.public_key());
979            let mut registrations = register_validators(&mut oracle, &validators).await;
980
981            // Link all validators except first
982            let link = Link {
983                latency: Duration::from_millis(10),
984                jitter: Duration::from_millis(1),
985                success_rate: 1.0,
986            };
987            link_validators(
988                &mut oracle,
989                &validators,
990                Action::Link(link),
991                Some(|_, i, j| ![i, j].contains(&0usize)),
992            )
993            .await;
994
995            // Derive threshold
996            let (polynomial, shares) =
997                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
998
999            // Create engines
1000            let relay = Arc::new(mocks::relay::Relay::new());
1001            let mut supervisors = Vec::new();
1002            let mut engine_handlers = Vec::new();
1003            for (idx_scheme, scheme) in schemes.iter().enumerate() {
1004                // Skip first peer
1005                if idx_scheme == 0 {
1006                    continue;
1007                }
1008
1009                // Create scheme context
1010                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1011
1012                // Configure engine
1013                let validator = scheme.public_key();
1014                let mut participants = BTreeMap::new();
1015                participants.insert(
1016                    0,
1017                    (
1018                        polynomial.clone(),
1019                        validators.clone(),
1020                        Some(shares[idx_scheme].clone()),
1021                    ),
1022                );
1023                let supervisor_config = mocks::supervisor::Config {
1024                    namespace: namespace.clone(),
1025                    participants,
1026                };
1027                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1028                supervisors.push(supervisor.clone());
1029                let application_cfg = mocks::application::Config {
1030                    hasher: Sha256::default(),
1031                    relay: relay.clone(),
1032                    participant: validator.clone(),
1033                    propose_latency: (10.0, 5.0),
1034                    verify_latency: (10.0, 5.0),
1035                };
1036                let (actor, application) = mocks::application::Application::new(
1037                    context.with_label("application"),
1038                    application_cfg,
1039                );
1040                actor.start();
1041                let blocker = oracle.control(scheme.public_key());
1042                let cfg = config::Config {
1043                    crypto: scheme.clone(),
1044                    blocker,
1045                    automaton: application.clone(),
1046                    relay: application.clone(),
1047                    reporter: supervisor.clone(),
1048                    supervisor,
1049                    partition: validator.to_string(),
1050                    mailbox_size: 1024,
1051                    namespace: namespace.clone(),
1052                    leader_timeout: Duration::from_secs(1),
1053                    notarization_timeout: Duration::from_secs(2),
1054                    nullify_retry: Duration::from_secs(10),
1055                    fetch_timeout: Duration::from_secs(1),
1056                    activity_timeout,
1057                    skip_timeout,
1058                    max_fetch_count: 1, // force many fetches
1059                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1060                    fetch_concurrent: 1,
1061                    replay_buffer: NZUsize!(1024 * 1024),
1062                    write_buffer: NZUsize!(1024 * 1024),
1063                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1064                };
1065                let engine = Engine::new(context.with_label("engine"), cfg);
1066
1067                // Start engine
1068                let (pending, recovered, resolver) = registrations
1069                    .remove(&validator)
1070                    .expect("validator should be registered");
1071                engine_handlers.push(engine.start(pending, recovered, resolver));
1072            }
1073
1074            // Wait for all engines to finish
1075            let mut finalizers = Vec::new();
1076            for supervisor in supervisors.iter_mut() {
1077                let (mut latest, mut monitor) = supervisor.subscribe().await;
1078                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1079                    while latest < required_containers {
1080                        latest = monitor.next().await.expect("event missing");
1081                    }
1082                }));
1083            }
1084            join_all(finalizers).await;
1085
1086            // Degrade network connections for online peers
1087            let link = Link {
1088                latency: Duration::from_secs(3),
1089                jitter: Duration::from_millis(0),
1090                success_rate: 1.0,
1091            };
1092            link_validators(
1093                &mut oracle,
1094                &validators,
1095                Action::Update(link.clone()),
1096                Some(|_, i, j| ![i, j].contains(&0usize)),
1097            )
1098            .await;
1099
1100            // Wait for nullifications to accrue
1101            context.sleep(Duration::from_secs(120)).await;
1102
1103            // Unlink second peer from all (except first)
1104            link_validators(
1105                &mut oracle,
1106                &validators,
1107                Action::Unlink,
1108                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1109            )
1110            .await;
1111
1112            // Configure engine for first peer
1113            let scheme = schemes[0].clone();
1114            let validator = scheme.public_key();
1115            let context = context.with_label(&format!("validator-{validator}"));
1116
1117            // Link first peer to all (except second)
1118            link_validators(
1119                &mut oracle,
1120                &validators,
1121                Action::Link(link),
1122                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1123            )
1124            .await;
1125
1126            // Restore network connections for all online peers
1127            let link = Link {
1128                latency: Duration::from_millis(10),
1129                jitter: Duration::from_millis(3),
1130                success_rate: 1.0,
1131            };
1132            link_validators(
1133                &mut oracle,
1134                &validators,
1135                Action::Update(link),
1136                Some(|_, i, j| ![i, j].contains(&1usize)),
1137            )
1138            .await;
1139
1140            // Configure engine
1141            let mut participants = BTreeMap::new();
1142            participants.insert(
1143                0,
1144                (
1145                    polynomial.clone(),
1146                    validators.clone(),
1147                    Some(shares[0].clone()),
1148                ),
1149            );
1150            let supervisor_config = mocks::supervisor::Config::<_, V> {
1151                namespace: namespace.clone(),
1152                participants,
1153            };
1154            let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1155            supervisors.push(supervisor.clone());
1156            let application_cfg = mocks::application::Config {
1157                hasher: Sha256::default(),
1158                relay: relay.clone(),
1159                participant: validator.clone(),
1160                propose_latency: (10.0, 5.0),
1161                verify_latency: (10.0, 5.0),
1162            };
1163            let (actor, application) = mocks::application::Application::new(
1164                context.with_label("application"),
1165                application_cfg,
1166            );
1167            actor.start();
1168            let blocker = oracle.control(scheme.public_key());
1169            let cfg = config::Config {
1170                crypto: scheme,
1171                blocker,
1172                automaton: application.clone(),
1173                relay: application.clone(),
1174                reporter: supervisor.clone(),
1175                supervisor: supervisor.clone(),
1176                partition: validator.to_string(),
1177                mailbox_size: 1024,
1178                namespace: namespace.clone(),
1179                leader_timeout: Duration::from_secs(1),
1180                notarization_timeout: Duration::from_secs(2),
1181                nullify_retry: Duration::from_secs(10),
1182                fetch_timeout: Duration::from_secs(1),
1183                activity_timeout,
1184                skip_timeout,
1185                max_fetch_count: 1,
1186                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1187                fetch_concurrent: 1,
1188                replay_buffer: NZUsize!(1024 * 1024),
1189                write_buffer: NZUsize!(1024 * 1024),
1190                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1191            };
1192            let engine = Engine::new(context.with_label("engine"), cfg);
1193
1194            // Start engine
1195            let (pending, recovered, resolver) = registrations
1196                .remove(&validator)
1197                .expect("validator should be registered");
1198            engine_handlers.push(engine.start(pending, recovered, resolver));
1199
1200            // Wait for new engine to finalize required
1201            let (mut latest, mut monitor) = supervisor.subscribe().await;
1202            while latest < required_containers {
1203                latest = monitor.next().await.expect("event missing");
1204            }
1205
1206            // Ensure no blocked connections
1207            let blocked = oracle.blocked().await.unwrap();
1208            assert!(blocked.is_empty());
1209        });
1210    }
1211
1212    #[test_traced]
1213    fn test_backfill() {
1214        backfill::<MinPk>();
1215        backfill::<MinSig>();
1216    }
1217
1218    fn one_offline<V: Variant>() {
1219        // Create context
1220        let n = 5;
1221        let threshold = quorum(n);
1222        let required_containers = 100;
1223        let activity_timeout = 10;
1224        let skip_timeout = 5;
1225        let max_exceptions = 10;
1226        let namespace = b"consensus".to_vec();
1227        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1228        executor.start(|mut context| async move {
1229            // Create simulated network
1230            let (network, mut oracle) = Network::new(
1231                context.with_label("network"),
1232                Config {
1233                    max_size: 1024 * 1024,
1234                },
1235            );
1236
1237            // Start network
1238            network.start();
1239
1240            // Register participants
1241            let mut schemes = Vec::new();
1242            let mut validators = Vec::new();
1243            for i in 0..n {
1244                let scheme = PrivateKey::from_seed(i as u64);
1245                let pk = scheme.public_key();
1246                schemes.push(scheme);
1247                validators.push(pk);
1248            }
1249            validators.sort();
1250            schemes.sort_by_key(|s| s.public_key());
1251            let mut registrations = register_validators(&mut oracle, &validators).await;
1252
1253            // Link all validators except first
1254            let link = Link {
1255                latency: Duration::from_millis(10),
1256                jitter: Duration::from_millis(1),
1257                success_rate: 1.0,
1258            };
1259            link_validators(
1260                &mut oracle,
1261                &validators,
1262                Action::Link(link),
1263                Some(|_, i, j| ![i, j].contains(&0usize)),
1264            )
1265            .await;
1266
1267            // Derive threshold
1268            let (polynomial, shares) =
1269                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1270
1271            // Create engines
1272            let relay = Arc::new(mocks::relay::Relay::new());
1273            let mut supervisors = Vec::new();
1274            let mut engine_handlers = Vec::new();
1275            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1276                // Skip first peer
1277                if idx_scheme == 0 {
1278                    continue;
1279                }
1280
1281                // Create scheme context
1282                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1283
1284                // Configure engine
1285                let validator = scheme.public_key();
1286                let mut participants = BTreeMap::new();
1287                participants.insert(
1288                    0,
1289                    (
1290                        polynomial.clone(),
1291                        validators.clone(),
1292                        Some(shares[idx_scheme].clone()),
1293                    ),
1294                );
1295                let supervisor_config = mocks::supervisor::Config::<_, V> {
1296                    namespace: namespace.clone(),
1297                    participants,
1298                };
1299                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1300                supervisors.push(supervisor.clone());
1301                let application_cfg = mocks::application::Config {
1302                    hasher: Sha256::default(),
1303                    relay: relay.clone(),
1304                    participant: validator.clone(),
1305                    propose_latency: (10.0, 5.0),
1306                    verify_latency: (10.0, 5.0),
1307                };
1308                let (actor, application) = mocks::application::Application::new(
1309                    context.with_label("application"),
1310                    application_cfg,
1311                );
1312                actor.start();
1313                let blocker = oracle.control(scheme.public_key());
1314                let cfg = config::Config {
1315                    crypto: scheme,
1316                    blocker,
1317                    automaton: application.clone(),
1318                    relay: application.clone(),
1319                    reporter: supervisor.clone(),
1320                    supervisor,
1321                    partition: validator.to_string(),
1322                    mailbox_size: 1024,
1323                    namespace: namespace.clone(),
1324                    leader_timeout: Duration::from_secs(1),
1325                    notarization_timeout: Duration::from_secs(2),
1326                    nullify_retry: Duration::from_secs(10),
1327                    fetch_timeout: Duration::from_secs(1),
1328                    activity_timeout,
1329                    skip_timeout,
1330                    max_fetch_count: 1,
1331                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1332                    fetch_concurrent: 1,
1333                    replay_buffer: NZUsize!(1024 * 1024),
1334                    write_buffer: NZUsize!(1024 * 1024),
1335                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1336                };
1337                let engine = Engine::new(context.with_label("engine"), cfg);
1338
1339                // Start engine
1340                let (pending, recovered, resolver) = registrations
1341                    .remove(&validator)
1342                    .expect("validator should be registered");
1343                engine_handlers.push(engine.start(pending, recovered, resolver));
1344            }
1345
1346            // Wait for all engines to finish
1347            let mut finalizers = Vec::new();
1348            for supervisor in supervisors.iter_mut() {
1349                let (mut latest, mut monitor) = supervisor.subscribe().await;
1350                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1351                    while latest < required_containers {
1352                        latest = monitor.next().await.expect("event missing");
1353                    }
1354                }));
1355            }
1356            join_all(finalizers).await;
1357
1358            // Check supervisors for correct activity
1359            let exceptions = 0;
1360            let offline = &validators[0];
1361            for supervisor in supervisors.iter() {
1362                // Ensure no faults
1363                {
1364                    let faults = supervisor.faults.lock().unwrap();
1365                    assert!(faults.is_empty());
1366                }
1367
1368                // Ensure no invalid signatures
1369                {
1370                    let invalid = supervisor.invalid.lock().unwrap();
1371                    assert_eq!(*invalid, 0);
1372                }
1373
1374                // Ensure offline node is never active
1375                let mut exceptions = 0;
1376                {
1377                    let notarizes = supervisor.notarizes.lock().unwrap();
1378                    for (view, payloads) in notarizes.iter() {
1379                        for (_, participants) in payloads.iter() {
1380                            if participants.contains(offline) {
1381                                panic!("view: {view}");
1382                            }
1383                        }
1384                    }
1385                }
1386                {
1387                    let nullifies = supervisor.nullifies.lock().unwrap();
1388                    for (view, participants) in nullifies.iter() {
1389                        if participants.contains(offline) {
1390                            panic!("view: {view}");
1391                        }
1392                    }
1393                }
1394                {
1395                    let finalizes = supervisor.finalizes.lock().unwrap();
1396                    for (view, payloads) in finalizes.iter() {
1397                        for (_, finalizers) in payloads.iter() {
1398                            if finalizers.contains(offline) {
1399                                panic!("view: {view}");
1400                            }
1401                        }
1402                    }
1403                }
1404
1405                // Identify offline views
1406                let mut offline_views = Vec::new();
1407                {
1408                    let leaders = supervisor.leaders.lock().unwrap();
1409                    for (view, leader) in leaders.iter() {
1410                        if leader == offline {
1411                            offline_views.push(*view);
1412                        }
1413                    }
1414                }
1415                assert!(!offline_views.is_empty());
1416
1417                // Ensure nullifies/nullification collected for offline node
1418                {
1419                    let nullifies = supervisor.nullifies.lock().unwrap();
1420                    for view in offline_views.iter() {
1421                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1422                        if nullifies < threshold as usize {
1423                            warn!("missing expected view nullifies: {}", view);
1424                            exceptions += 1;
1425                        }
1426                    }
1427                }
1428                {
1429                    let nullifications = supervisor.nullifications.lock().unwrap();
1430                    for view in offline_views.iter() {
1431                        if !nullifications.contains_key(view) {
1432                            warn!("missing expected view nullifies: {}", view);
1433                            exceptions += 1;
1434                        }
1435                    }
1436                }
1437
1438                // Ensure exceptions within allowed
1439                assert!(exceptions <= max_exceptions);
1440            }
1441            assert!(exceptions <= max_exceptions);
1442
1443            // Ensure no blocked connections
1444            let blocked = oracle.blocked().await.unwrap();
1445            assert!(blocked.is_empty());
1446
1447            // Ensure we are skipping views
1448            let encoded = context.encode();
1449            let lines = encoded.lines();
1450            let mut skipped_views = 0;
1451            let mut nodes_skipping = 0;
1452            for line in lines {
1453                if line.contains("_engine_voter_skipped_views_total") {
1454                    let parts: Vec<&str> = line.split_whitespace().collect();
1455                    if let Some(number_str) = parts.last() {
1456                        if let Ok(number) = number_str.parse::<u64>() {
1457                            if number > 0 {
1458                                nodes_skipping += 1;
1459                            }
1460                            if number > skipped_views {
1461                                skipped_views = number;
1462                            }
1463                        }
1464                    }
1465                }
1466            }
1467            assert!(
1468                skipped_views > 0,
1469                "expected skipped views to be greater than 0"
1470            );
1471            assert_eq!(
1472                nodes_skipping,
1473                n - 1,
1474                "expected all online nodes to be skipping views"
1475            );
1476        });
1477    }
1478
1479    #[test_traced]
1480    fn test_one_offline() {
1481        one_offline::<MinPk>();
1482        one_offline::<MinSig>();
1483    }
1484
1485    fn slow_validator<V: Variant>() {
1486        // Create context
1487        let n = 5;
1488        let threshold = quorum(n);
1489        let required_containers = 50;
1490        let activity_timeout = 10;
1491        let skip_timeout = 5;
1492        let namespace = b"consensus".to_vec();
1493        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1494        executor.start(|mut context| async move {
1495            // Create simulated network
1496            let (network, mut oracle) = Network::new(
1497                context.with_label("network"),
1498                Config {
1499                    max_size: 1024 * 1024,
1500                },
1501            );
1502
1503            // Start network
1504            network.start();
1505
1506            // Register participants
1507            let mut schemes = Vec::new();
1508            let mut validators = Vec::new();
1509            for i in 0..n {
1510                let scheme = PrivateKey::from_seed(i as u64);
1511                let pk = scheme.public_key();
1512                schemes.push(scheme);
1513                validators.push(pk);
1514            }
1515            validators.sort();
1516            schemes.sort_by_key(|s| s.public_key());
1517            let mut registrations = register_validators(&mut oracle, &validators).await;
1518
1519            // Link all validators
1520            let link = Link {
1521                latency: Duration::from_millis(10),
1522                jitter: Duration::from_millis(1),
1523                success_rate: 1.0,
1524            };
1525            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1526
1527            // Derive threshold
1528            let (polynomial, shares) =
1529                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1530
1531            // Create engines
1532            let relay = Arc::new(mocks::relay::Relay::new());
1533            let mut supervisors = Vec::new();
1534            let mut engine_handlers = Vec::new();
1535            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1536                // Create scheme context
1537                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1538
1539                // Configure engine
1540                let validator = scheme.public_key();
1541                let mut participants = BTreeMap::new();
1542                participants.insert(
1543                    0,
1544                    (
1545                        polynomial.clone(),
1546                        validators.clone(),
1547                        Some(shares[idx_scheme].clone()),
1548                    ),
1549                );
1550                let supervisor_config = mocks::supervisor::Config::<_, V> {
1551                    namespace: namespace.clone(),
1552                    participants,
1553                };
1554                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1555                supervisors.push(supervisor.clone());
1556                let application_cfg = if idx_scheme == 0 {
1557                    mocks::application::Config {
1558                        hasher: Sha256::default(),
1559                        relay: relay.clone(),
1560                        participant: validator.clone(),
1561                        propose_latency: (10_000.0, 0.0),
1562                        verify_latency: (10_000.0, 5.0),
1563                    }
1564                } else {
1565                    mocks::application::Config {
1566                        hasher: Sha256::default(),
1567                        relay: relay.clone(),
1568                        participant: validator.clone(),
1569                        propose_latency: (10.0, 5.0),
1570                        verify_latency: (10.0, 5.0),
1571                    }
1572                };
1573                let (actor, application) = mocks::application::Application::new(
1574                    context.with_label("application"),
1575                    application_cfg,
1576                );
1577                actor.start();
1578                let blocker = oracle.control(scheme.public_key());
1579                let cfg = config::Config {
1580                    crypto: scheme,
1581                    blocker,
1582                    automaton: application.clone(),
1583                    relay: application.clone(),
1584                    reporter: supervisor.clone(),
1585                    supervisor,
1586                    partition: validator.to_string(),
1587                    mailbox_size: 1024,
1588                    namespace: namespace.clone(),
1589                    leader_timeout: Duration::from_secs(1),
1590                    notarization_timeout: Duration::from_secs(2),
1591                    nullify_retry: Duration::from_secs(10),
1592                    fetch_timeout: Duration::from_secs(1),
1593                    activity_timeout,
1594                    skip_timeout,
1595                    max_fetch_count: 1,
1596                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1597                    fetch_concurrent: 1,
1598                    replay_buffer: NZUsize!(1024 * 1024),
1599                    write_buffer: NZUsize!(1024 * 1024),
1600                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1601                };
1602                let engine = Engine::new(context.with_label("engine"), cfg);
1603
1604                // Start engine
1605                let (pending, recovered, resolver) = registrations
1606                    .remove(&validator)
1607                    .expect("validator should be registered");
1608                engine_handlers.push(engine.start(pending, recovered, resolver));
1609            }
1610
1611            // Wait for all engines to finish
1612            let mut finalizers = Vec::new();
1613            for supervisor in supervisors.iter_mut() {
1614                let (mut latest, mut monitor) = supervisor.subscribe().await;
1615                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1616                    while latest < required_containers {
1617                        latest = monitor.next().await.expect("event missing");
1618                    }
1619                }));
1620            }
1621            join_all(finalizers).await;
1622
1623            // Check supervisors for correct activity
1624            let slow = &validators[0];
1625            for supervisor in supervisors.iter() {
1626                // Ensure no faults
1627                {
1628                    let faults = supervisor.faults.lock().unwrap();
1629                    assert!(faults.is_empty());
1630                }
1631
1632                // Ensure no invalid signatures
1633                {
1634                    let invalid = supervisor.invalid.lock().unwrap();
1635                    assert_eq!(*invalid, 0);
1636                }
1637
1638                // Ensure slow node never emits a notarize or finalize (will never finish verification in a timely manner)
1639                {
1640                    let notarizes = supervisor.notarizes.lock().unwrap();
1641                    for (view, payloads) in notarizes.iter() {
1642                        for (_, participants) in payloads.iter() {
1643                            if participants.contains(slow) {
1644                                panic!("view: {view}");
1645                            }
1646                        }
1647                    }
1648                }
1649                {
1650                    let finalizes = supervisor.finalizes.lock().unwrap();
1651                    for (view, payloads) in finalizes.iter() {
1652                        for (_, finalizers) in payloads.iter() {
1653                            if finalizers.contains(slow) {
1654                                panic!("view: {view}");
1655                            }
1656                        }
1657                    }
1658                }
1659            }
1660
1661            // Ensure no blocked connections
1662            let blocked = oracle.blocked().await.unwrap();
1663            assert!(blocked.is_empty());
1664        });
1665    }
1666
1667    #[test_traced]
1668    fn test_slow_validator() {
1669        slow_validator::<MinPk>();
1670        slow_validator::<MinSig>();
1671    }
1672
1673    fn all_recovery<V: Variant>() {
1674        // Create context
1675        let n = 5;
1676        let threshold = quorum(n);
1677        let required_containers = 100;
1678        let activity_timeout = 10;
1679        let skip_timeout = 2;
1680        let namespace = b"consensus".to_vec();
1681        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1682        executor.start(|mut context| async move {
1683            // Create simulated network
1684            let (network, mut oracle) = Network::new(
1685                context.with_label("network"),
1686                Config {
1687                    max_size: 1024 * 1024,
1688                },
1689            );
1690
1691            // Start network
1692            network.start();
1693
1694            // Register participants
1695            let mut schemes = Vec::new();
1696            let mut validators = Vec::new();
1697            for i in 0..n {
1698                let scheme = PrivateKey::from_seed(i as u64);
1699                let pk = scheme.public_key();
1700                schemes.push(scheme);
1701                validators.push(pk);
1702            }
1703            validators.sort();
1704            schemes.sort_by_key(|s| s.public_key());
1705            let mut registrations = register_validators(&mut oracle, &validators).await;
1706
1707            // Link all validators
1708            let link = Link {
1709                latency: Duration::from_secs(3),
1710                jitter: Duration::from_millis(0),
1711                success_rate: 1.0,
1712            };
1713            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1714
1715            // Derive threshold
1716            let (polynomial, shares) =
1717                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1718
1719            // Create engines
1720            let relay = Arc::new(mocks::relay::Relay::new());
1721            let mut supervisors = Vec::new();
1722            let mut engine_handlers = Vec::new();
1723            for (idx, scheme) in schemes.iter().enumerate() {
1724                // Create scheme context
1725                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1726
1727                // Configure engine
1728                let validator = scheme.public_key();
1729                let mut participants = BTreeMap::new();
1730                participants.insert(
1731                    0,
1732                    (
1733                        polynomial.clone(),
1734                        validators.clone(),
1735                        Some(shares[idx].clone()),
1736                    ),
1737                );
1738                let supervisor_config = mocks::supervisor::Config::<_, V> {
1739                    namespace: namespace.clone(),
1740                    participants,
1741                };
1742                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1743                supervisors.push(supervisor.clone());
1744                let application_cfg = mocks::application::Config {
1745                    hasher: Sha256::default(),
1746                    relay: relay.clone(),
1747                    participant: validator.clone(),
1748                    propose_latency: (10.0, 5.0),
1749                    verify_latency: (10.0, 5.0),
1750                };
1751                let (actor, application) = mocks::application::Application::new(
1752                    context.with_label("application"),
1753                    application_cfg,
1754                );
1755                actor.start();
1756                let blocker = oracle.control(scheme.public_key());
1757                let cfg = config::Config {
1758                    crypto: scheme.clone(),
1759                    blocker,
1760                    automaton: application.clone(),
1761                    relay: application.clone(),
1762                    reporter: supervisor.clone(),
1763                    supervisor,
1764                    partition: validator.to_string(),
1765                    mailbox_size: 1024,
1766                    namespace: namespace.clone(),
1767                    leader_timeout: Duration::from_secs(1),
1768                    notarization_timeout: Duration::from_secs(2),
1769                    nullify_retry: Duration::from_secs(10),
1770                    fetch_timeout: Duration::from_secs(1),
1771                    activity_timeout,
1772                    skip_timeout,
1773                    max_fetch_count: 1,
1774                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1775                    fetch_concurrent: 1,
1776                    replay_buffer: NZUsize!(1024 * 1024),
1777                    write_buffer: NZUsize!(1024 * 1024),
1778                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1779                };
1780                let engine = Engine::new(context.with_label("engine"), cfg);
1781
1782                // Start engine
1783                let (pending, recovered, resolver) = registrations
1784                    .remove(&validator)
1785                    .expect("validator should be registered");
1786                engine_handlers.push(engine.start(pending, recovered, resolver));
1787            }
1788
1789            // Wait for a few virtual minutes (shouldn't finalize anything)
1790            let mut finalizers = Vec::new();
1791            for supervisor in supervisors.iter_mut() {
1792                let (_, mut monitor) = supervisor.subscribe().await;
1793                finalizers.push(
1794                    context
1795                        .with_label("finalizer")
1796                        .spawn(move |context| async move {
1797                            select! {
1798                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1799                                _done = monitor.next() => {
1800                                    panic!("engine should not notarize or finalize anything");
1801                                }
1802                            }
1803                        }),
1804                );
1805            }
1806            join_all(finalizers).await;
1807
1808            // Unlink all validators to get latest view
1809            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1810
1811            // Wait for a virtual minute (nothing should happen)
1812            context.sleep(Duration::from_secs(60)).await;
1813
1814            // Get latest view
1815            let mut latest = 0;
1816            for supervisor in supervisors.iter() {
1817                let nullifies = supervisor.nullifies.lock().unwrap();
1818                let max = nullifies.keys().max().unwrap();
1819                if *max > latest {
1820                    latest = *max;
1821                }
1822            }
1823
1824            // Update links
1825            let link = Link {
1826                latency: Duration::from_millis(10),
1827                jitter: Duration::from_millis(1),
1828                success_rate: 1.0,
1829            };
1830            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1831
1832            // Wait for all engines to finish
1833            let mut finalizers = Vec::new();
1834            for supervisor in supervisors.iter_mut() {
1835                let (mut latest, mut monitor) = supervisor.subscribe().await;
1836                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1837                    while latest < required_containers {
1838                        latest = monitor.next().await.expect("event missing");
1839                    }
1840                }));
1841            }
1842            join_all(finalizers).await;
1843
1844            // Check supervisors for correct activity
1845            for supervisor in supervisors.iter() {
1846                // Ensure no faults
1847                {
1848                    let faults = supervisor.faults.lock().unwrap();
1849                    assert!(faults.is_empty());
1850                }
1851
1852                // Ensure no invalid signatures
1853                {
1854                    let invalid = supervisor.invalid.lock().unwrap();
1855                    assert_eq!(*invalid, 0);
1856                }
1857
1858                // Ensure quick recovery.
1859                //
1860                // If the skip timeout isn't implemented correctly, we may go many views before participants
1861                // start to consider a validator's proposal.
1862                {
1863                    // Ensure nearly all views around latest finalize
1864                    let mut found = 0;
1865                    let finalizations = supervisor.finalizations.lock().unwrap();
1866                    for i in latest..latest + activity_timeout {
1867                        if finalizations.contains_key(&i) {
1868                            found += 1;
1869                        }
1870                    }
1871                    assert!(found >= activity_timeout - 2, "found: {found}");
1872                }
1873            }
1874
1875            // Ensure no blocked connections
1876            let blocked = oracle.blocked().await.unwrap();
1877            assert!(blocked.is_empty());
1878        });
1879    }
1880
1881    #[test_traced]
1882    fn test_all_recovery() {
1883        all_recovery::<MinPk>();
1884        all_recovery::<MinSig>();
1885    }
1886
1887    fn partition<V: Variant>() {
1888        // Create context
1889        let n = 10;
1890        let threshold = quorum(n);
1891        let required_containers = 50;
1892        let activity_timeout = 10;
1893        let skip_timeout = 5;
1894        let namespace = b"consensus".to_vec();
1895        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1896        executor.start(|mut context| async move {
1897            // Create simulated network
1898            let (network, mut oracle) = Network::new(
1899                context.with_label("network"),
1900                Config {
1901                    max_size: 1024 * 1024,
1902                },
1903            );
1904
1905            // Start network
1906            network.start();
1907
1908            // Register participants
1909            let mut schemes = Vec::new();
1910            let mut validators = Vec::new();
1911            for i in 0..n {
1912                let scheme = PrivateKey::from_seed(i as u64);
1913                let pk = scheme.public_key();
1914                schemes.push(scheme);
1915                validators.push(pk);
1916            }
1917            validators.sort();
1918            schemes.sort_by_key(|s| s.public_key());
1919            let mut registrations = register_validators(&mut oracle, &validators).await;
1920
1921            // Link all validators
1922            let link = Link {
1923                latency: Duration::from_millis(10),
1924                jitter: Duration::from_millis(1),
1925                success_rate: 1.0,
1926            };
1927            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1928
1929            // Derive threshold
1930            let (polynomial, shares) =
1931                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1932
1933            // Create engines
1934            let relay = Arc::new(mocks::relay::Relay::new());
1935            let mut supervisors = Vec::new();
1936            let mut engine_handlers = Vec::new();
1937            for (idx, scheme) in schemes.iter().enumerate() {
1938                // Create scheme context
1939                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1940
1941                // Configure engine
1942                let validator = scheme.public_key();
1943                let mut participants = BTreeMap::new();
1944                participants.insert(
1945                    0,
1946                    (
1947                        polynomial.clone(),
1948                        validators.clone(),
1949                        Some(shares[idx].clone()),
1950                    ),
1951                );
1952                let supervisor_config = mocks::supervisor::Config::<_, V> {
1953                    namespace: namespace.clone(),
1954                    participants,
1955                };
1956                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1957                supervisors.push(supervisor.clone());
1958                let application_cfg = mocks::application::Config {
1959                    hasher: Sha256::default(),
1960                    relay: relay.clone(),
1961                    participant: validator.clone(),
1962                    propose_latency: (10.0, 5.0),
1963                    verify_latency: (10.0, 5.0),
1964                };
1965                let (actor, application) = mocks::application::Application::new(
1966                    context.with_label("application"),
1967                    application_cfg,
1968                );
1969                actor.start();
1970                let blocker = oracle.control(scheme.public_key());
1971                let cfg = config::Config {
1972                    crypto: scheme.clone(),
1973                    blocker,
1974                    automaton: application.clone(),
1975                    relay: application.clone(),
1976                    reporter: supervisor.clone(),
1977                    supervisor,
1978                    partition: validator.to_string(),
1979                    mailbox_size: 1024,
1980                    namespace: namespace.clone(),
1981                    leader_timeout: Duration::from_secs(1),
1982                    notarization_timeout: Duration::from_secs(2),
1983                    nullify_retry: Duration::from_secs(10),
1984                    fetch_timeout: Duration::from_secs(1),
1985                    activity_timeout,
1986                    skip_timeout,
1987                    max_fetch_count: 1,
1988                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1989                    fetch_concurrent: 1,
1990                    replay_buffer: NZUsize!(1024 * 1024),
1991                    write_buffer: NZUsize!(1024 * 1024),
1992                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1993                };
1994                let engine = Engine::new(context.with_label("engine"), cfg);
1995
1996                // Start engine
1997                let (pending, recovered, resolver) = registrations
1998                    .remove(&validator)
1999                    .expect("validator should be registered");
2000                engine_handlers.push(engine.start(pending, recovered, resolver));
2001            }
2002
2003            // Wait for all engines to finish
2004            let mut finalizers = Vec::new();
2005            for supervisor in supervisors.iter_mut() {
2006                let (mut latest, mut monitor) = supervisor.subscribe().await;
2007                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2008                    while latest < required_containers {
2009                        latest = monitor.next().await.expect("event missing");
2010                    }
2011                }));
2012            }
2013            join_all(finalizers).await;
2014
2015            // Cut all links between validator halves
2016            fn separated(n: usize, a: usize, b: usize) -> bool {
2017                let m = n / 2;
2018                (a < m && b >= m) || (a >= m && b < m)
2019            }
2020            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
2021
2022            // Wait for any in-progress notarizations/finalizations to finish
2023            context.sleep(Duration::from_secs(10)).await;
2024
2025            // Wait for a few virtual minutes (shouldn't finalize anything)
2026            let mut finalizers = Vec::new();
2027            for supervisor in supervisors.iter_mut() {
2028                let (_, mut monitor) = supervisor.subscribe().await;
2029                finalizers.push(
2030                    context
2031                        .with_label("finalizer")
2032                        .spawn(move |context| async move {
2033                            select! {
2034                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2035                                _done = monitor.next() => {
2036                                    panic!("engine should not notarize or finalize anything");
2037                                }
2038                            }
2039                        }),
2040                );
2041            }
2042            join_all(finalizers).await;
2043
2044            // Restore links
2045            link_validators(
2046                &mut oracle,
2047                &validators,
2048                Action::Link(link),
2049                Some(separated),
2050            )
2051            .await;
2052
2053            // Wait for all engines to finish
2054            let mut finalizers = Vec::new();
2055            for supervisor in supervisors.iter_mut() {
2056                let (mut latest, mut monitor) = supervisor.subscribe().await;
2057                let required = latest + required_containers;
2058                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2059                    while latest < required {
2060                        latest = monitor.next().await.expect("event missing");
2061                    }
2062                }));
2063            }
2064            join_all(finalizers).await;
2065
2066            // Check supervisors for correct activity
2067            for supervisor in supervisors.iter() {
2068                // Ensure no faults
2069                {
2070                    let faults = supervisor.faults.lock().unwrap();
2071                    assert!(faults.is_empty());
2072                }
2073
2074                // Ensure no invalid signatures
2075                {
2076                    let invalid = supervisor.invalid.lock().unwrap();
2077                    assert_eq!(*invalid, 0);
2078                }
2079            }
2080
2081            // Ensure no blocked connections
2082            let blocked = oracle.blocked().await.unwrap();
2083            assert!(blocked.is_empty());
2084        });
2085    }
2086
2087    #[test_traced]
2088    #[ignore]
2089    fn test_partition() {
2090        partition::<MinPk>();
2091        partition::<MinSig>();
2092    }
2093
2094    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
2095        // Create context
2096        let n = 5;
2097        let threshold = quorum(n);
2098        let required_containers = 50;
2099        let activity_timeout = 10;
2100        let skip_timeout = 5;
2101        let namespace = b"consensus".to_vec();
2102        let cfg = deterministic::Config::new()
2103            .with_seed(seed)
2104            .with_timeout(Some(Duration::from_secs(5_000)));
2105        let executor = deterministic::Runner::new(cfg);
2106        executor.start(|mut context| async move {
2107            // Create simulated network
2108            let (network, mut oracle) = Network::new(
2109                context.with_label("network"),
2110                Config {
2111                    max_size: 1024 * 1024,
2112                },
2113            );
2114
2115            // Start network
2116            network.start();
2117
2118            // Register participants
2119            let mut schemes = Vec::new();
2120            let mut validators = Vec::new();
2121            for i in 0..n {
2122                let scheme = PrivateKey::from_seed(i as u64);
2123                let pk = scheme.public_key();
2124                schemes.push(scheme);
2125                validators.push(pk);
2126            }
2127            validators.sort();
2128            schemes.sort_by_key(|s| s.public_key());
2129            let mut registrations = register_validators(&mut oracle, &validators).await;
2130
2131            // Link all validators
2132            let degraded_link = Link {
2133                latency: Duration::from_millis(200),
2134                jitter: Duration::from_millis(150),
2135                success_rate: 0.5,
2136            };
2137            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
2138
2139            // Derive threshold
2140            let (polynomial, shares) =
2141                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2142
2143            // Create engines
2144            let relay = Arc::new(mocks::relay::Relay::new());
2145            let mut supervisors = Vec::new();
2146            let mut engine_handlers = Vec::new();
2147            for (idx, scheme) in schemes.into_iter().enumerate() {
2148                // Create scheme context
2149                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2150
2151                // Configure engine
2152                let validator = scheme.public_key();
2153                let mut participants = BTreeMap::new();
2154                participants.insert(
2155                    0,
2156                    (
2157                        polynomial.clone(),
2158                        validators.clone(),
2159                        Some(shares[idx].clone()),
2160                    ),
2161                );
2162                let supervisor_config = mocks::supervisor::Config::<_, V> {
2163                    namespace: namespace.clone(),
2164                    participants,
2165                };
2166                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2167                supervisors.push(supervisor.clone());
2168                let application_cfg = mocks::application::Config {
2169                    hasher: Sha256::default(),
2170                    relay: relay.clone(),
2171                    participant: validator.clone(),
2172                    propose_latency: (10.0, 5.0),
2173                    verify_latency: (10.0, 5.0),
2174                };
2175                let (actor, application) = mocks::application::Application::new(
2176                    context.with_label("application"),
2177                    application_cfg,
2178                );
2179                actor.start();
2180                let blocker = oracle.control(scheme.public_key());
2181                let cfg = config::Config {
2182                    crypto: scheme,
2183                    blocker,
2184                    automaton: application.clone(),
2185                    relay: application.clone(),
2186                    reporter: supervisor.clone(),
2187                    supervisor,
2188                    partition: validator.to_string(),
2189                    mailbox_size: 1024,
2190                    namespace: namespace.clone(),
2191                    leader_timeout: Duration::from_secs(1),
2192                    notarization_timeout: Duration::from_secs(2),
2193                    nullify_retry: Duration::from_secs(10),
2194                    fetch_timeout: Duration::from_secs(1),
2195                    activity_timeout,
2196                    skip_timeout,
2197                    max_fetch_count: 1,
2198                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2199                    fetch_concurrent: 1,
2200                    replay_buffer: NZUsize!(1024 * 1024),
2201                    write_buffer: NZUsize!(1024 * 1024),
2202                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2203                };
2204                let engine = Engine::new(context.with_label("engine"), cfg);
2205
2206                // Start engine
2207                let (pending, recovered, resolver) = registrations
2208                    .remove(&validator)
2209                    .expect("validator should be registered");
2210                engine_handlers.push(engine.start(pending, recovered, resolver));
2211            }
2212
2213            // Wait for all engines to finish
2214            let mut finalizers = Vec::new();
2215            for supervisor in supervisors.iter_mut() {
2216                let (mut latest, mut monitor) = supervisor.subscribe().await;
2217                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2218                    while latest < required_containers {
2219                        latest = monitor.next().await.expect("event missing");
2220                    }
2221                }));
2222            }
2223            join_all(finalizers).await;
2224
2225            // Check supervisors for correct activity
2226            for supervisor in supervisors.iter() {
2227                // Ensure no faults
2228                {
2229                    let faults = supervisor.faults.lock().unwrap();
2230                    assert!(faults.is_empty());
2231                }
2232
2233                // Ensure no invalid signatures
2234                {
2235                    let invalid = supervisor.invalid.lock().unwrap();
2236                    assert_eq!(*invalid, 0);
2237                }
2238            }
2239
2240            // Ensure no blocked connections
2241            let blocked = oracle.blocked().await.unwrap();
2242            assert!(blocked.is_empty());
2243
2244            context.auditor().state()
2245        })
2246    }
2247
2248    #[test_traced]
2249    fn test_slow_and_lossy_links() {
2250        slow_and_lossy_links::<MinPk>(0);
2251        slow_and_lossy_links::<MinSig>(0);
2252    }
2253
2254    #[test_traced]
2255    #[ignore]
2256    fn test_determinism() {
2257        // We use slow and lossy links as the deterministic test
2258        // because it is the most complex test.
2259        for seed in 1..6 {
2260            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2261            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2262            assert_eq!(pk_state_1, pk_state_2);
2263
2264            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2265            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2266            assert_eq!(sig_state_1, sig_state_2);
2267
2268            // Sanity check that different types can't be identical
2269            assert_ne!(pk_state_1, sig_state_1);
2270        }
2271    }
2272
2273    fn conflicter<V: Variant>(seed: u64) {
2274        // Create context
2275        let n = 4;
2276        let threshold = quorum(n);
2277        let required_containers = 50;
2278        let activity_timeout = 10;
2279        let skip_timeout = 5;
2280        let namespace = b"consensus".to_vec();
2281        let cfg = deterministic::Config::new()
2282            .with_seed(seed)
2283            .with_timeout(Some(Duration::from_secs(30)));
2284        let executor = deterministic::Runner::new(cfg);
2285        executor.start(|mut context| async move {
2286            // Create simulated network
2287            let (network, mut oracle) = Network::new(
2288                context.with_label("network"),
2289                Config {
2290                    max_size: 1024 * 1024,
2291                },
2292            );
2293
2294            // Start network
2295            network.start();
2296
2297            // Register participants
2298            let mut schemes = Vec::new();
2299            let mut validators = Vec::new();
2300            for i in 0..n {
2301                let scheme = PrivateKey::from_seed(i as u64);
2302                let pk = scheme.public_key();
2303                schemes.push(scheme);
2304                validators.push(pk);
2305            }
2306            validators.sort();
2307            schemes.sort_by_key(|s| s.public_key());
2308            let mut registrations = register_validators(&mut oracle, &validators).await;
2309
2310            // Link all validators
2311            let link = Link {
2312                latency: Duration::from_millis(10),
2313                jitter: Duration::from_millis(1),
2314                success_rate: 1.0,
2315            };
2316            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2317
2318            // Derive threshold
2319            let (polynomial, shares) =
2320                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2321
2322            // Create engines
2323            let relay = Arc::new(mocks::relay::Relay::new());
2324            let mut supervisors = Vec::new();
2325            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2326                // Create scheme context
2327                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2328
2329                // Start engine
2330                let validator = scheme.public_key();
2331                let mut participants = BTreeMap::new();
2332                participants.insert(
2333                    0,
2334                    (
2335                        polynomial.clone(),
2336                        validators.clone(),
2337                        Some(shares[idx_scheme].clone()),
2338                    ),
2339                );
2340                let supervisor_config = mocks::supervisor::Config::<_, V> {
2341                    namespace: namespace.clone(),
2342                    participants,
2343                };
2344                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2345                let (pending, recovered, resolver) = registrations
2346                    .remove(&validator)
2347                    .expect("validator should be registered");
2348                if idx_scheme == 0 {
2349                    let cfg = mocks::conflicter::Config {
2350                        supervisor,
2351                        namespace: namespace.clone(),
2352                    };
2353
2354                    let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2355                        mocks::conflicter::Conflicter::new(
2356                            context.with_label("byzantine_engine"),
2357                            cfg,
2358                        );
2359                    engine.start(pending);
2360                } else {
2361                    supervisors.push(supervisor.clone());
2362                    let application_cfg = mocks::application::Config {
2363                        hasher: Sha256::default(),
2364                        relay: relay.clone(),
2365                        participant: validator.clone(),
2366                        propose_latency: (10.0, 5.0),
2367                        verify_latency: (10.0, 5.0),
2368                    };
2369                    let (actor, application) = mocks::application::Application::new(
2370                        context.with_label("application"),
2371                        application_cfg,
2372                    );
2373                    actor.start();
2374                    let blocker = oracle.control(scheme.public_key());
2375                    let cfg = config::Config {
2376                        crypto: scheme,
2377                        blocker,
2378                        automaton: application.clone(),
2379                        relay: application.clone(),
2380                        reporter: supervisor.clone(),
2381                        supervisor,
2382                        partition: validator.to_string(),
2383                        mailbox_size: 1024,
2384                        namespace: namespace.clone(),
2385                        leader_timeout: Duration::from_secs(1),
2386                        notarization_timeout: Duration::from_secs(2),
2387                        nullify_retry: Duration::from_secs(10),
2388                        fetch_timeout: Duration::from_secs(1),
2389                        activity_timeout,
2390                        skip_timeout,
2391                        max_fetch_count: 1,
2392                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2393                        fetch_concurrent: 1,
2394                        replay_buffer: NZUsize!(1024 * 1024),
2395                        write_buffer: NZUsize!(1024 * 1024),
2396                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2397                    };
2398                    let engine = Engine::new(context.with_label("engine"), cfg);
2399                    engine.start(pending, recovered, resolver);
2400                }
2401            }
2402
2403            // Wait for all engines to finish
2404            let mut finalizers = Vec::new();
2405            for supervisor in supervisors.iter_mut() {
2406                let (mut latest, mut monitor) = supervisor.subscribe().await;
2407                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2408                    while latest < required_containers {
2409                        latest = monitor.next().await.expect("event missing");
2410                    }
2411                }));
2412            }
2413            join_all(finalizers).await;
2414
2415            // Check supervisors for correct activity
2416            let byz = &validators[0];
2417            let mut count_conflicting = 0;
2418            for supervisor in supervisors.iter() {
2419                // Ensure only faults for byz
2420                {
2421                    let faults = supervisor.faults.lock().unwrap();
2422                    assert_eq!(faults.len(), 1);
2423                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2424                    for (_, faults) in faulter.iter() {
2425                        for fault in faults.iter() {
2426                            match fault {
2427                                Activity::ConflictingNotarize(_) => {
2428                                    count_conflicting += 1;
2429                                }
2430                                Activity::ConflictingFinalize(_) => {
2431                                    count_conflicting += 1;
2432                                }
2433                                _ => panic!("unexpected fault: {fault:?}"),
2434                            }
2435                        }
2436                    }
2437                }
2438
2439                // Ensure no invalid signatures
2440                {
2441                    let invalid = supervisor.invalid.lock().unwrap();
2442                    assert_eq!(*invalid, 0);
2443                }
2444            }
2445            assert!(count_conflicting > 0);
2446
2447            // Ensure conflicter is blocked
2448            let blocked = oracle.blocked().await.unwrap();
2449            assert!(!blocked.is_empty());
2450            for (a, b) in blocked {
2451                assert_ne!(&a, byz);
2452                assert_eq!(&b, byz);
2453            }
2454        });
2455    }
2456
2457    #[test_traced]
2458    #[ignore]
2459    fn test_conflicter() {
2460        for seed in 0..5 {
2461            conflicter::<MinPk>(seed);
2462            conflicter::<MinSig>(seed);
2463        }
2464    }
2465
2466    fn invalid<V: Variant>(seed: u64) {
2467        // Create context
2468        let n = 4;
2469        let threshold = quorum(n);
2470        let required_containers = 50;
2471        let activity_timeout = 10;
2472        let skip_timeout = 5;
2473        let namespace = b"consensus".to_vec();
2474        let cfg = deterministic::Config::new()
2475            .with_seed(seed)
2476            .with_timeout(Some(Duration::from_secs(30)));
2477        let executor = deterministic::Runner::new(cfg);
2478        executor.start(|mut context| async move {
2479            // Create simulated network
2480            let (network, mut oracle) = Network::new(
2481                context.with_label("network"),
2482                Config {
2483                    max_size: 1024 * 1024,
2484                },
2485            );
2486
2487            // Start network
2488            network.start();
2489
2490            // Register participants
2491            let mut schemes = Vec::new();
2492            let mut validators = Vec::new();
2493            for i in 0..n {
2494                let scheme = PrivateKey::from_seed(i as u64);
2495                let pk = scheme.public_key();
2496                schemes.push(scheme);
2497                validators.push(pk);
2498            }
2499            validators.sort();
2500            schemes.sort_by_key(|s| s.public_key());
2501            let mut registrations = register_validators(&mut oracle, &validators).await;
2502
2503            // Link all validators
2504            let link = Link {
2505                latency: Duration::from_millis(10),
2506                jitter: Duration::from_millis(1),
2507                success_rate: 1.0,
2508            };
2509            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2510
2511            // Derive threshold
2512            let (polynomial, shares) =
2513                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2514
2515            // Create engines
2516            let relay = Arc::new(mocks::relay::Relay::new());
2517            let mut supervisors = Vec::new();
2518            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2519                // Create scheme context
2520                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2521
2522                // Start engine
2523                let validator = scheme.public_key();
2524                let mut participants = BTreeMap::new();
2525                participants.insert(
2526                    0,
2527                    (
2528                        polynomial.clone(),
2529                        validators.clone(),
2530                        Some(shares[idx_scheme].clone()),
2531                    ),
2532                );
2533                let supervisor_config = mocks::supervisor::Config::<_, V> {
2534                    namespace: namespace.clone(),
2535                    participants,
2536                };
2537                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2538                let (pending, recovered, resolver) = registrations
2539                    .remove(&validator)
2540                    .expect("validator should be registered");
2541                if idx_scheme == 0 {
2542                    let cfg = mocks::invalid::Config {
2543                        supervisor,
2544                        namespace: namespace.clone(),
2545                    };
2546
2547                    let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2548                        mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2549                    engine.start(pending);
2550                } else {
2551                    supervisors.push(supervisor.clone());
2552                    let application_cfg = mocks::application::Config {
2553                        hasher: Sha256::default(),
2554                        relay: relay.clone(),
2555                        participant: validator.clone(),
2556                        propose_latency: (10.0, 5.0),
2557                        verify_latency: (10.0, 5.0),
2558                    };
2559                    let (actor, application) = mocks::application::Application::new(
2560                        context.with_label("application"),
2561                        application_cfg,
2562                    );
2563                    actor.start();
2564                    let blocker = oracle.control(scheme.public_key());
2565                    let cfg = config::Config {
2566                        crypto: scheme,
2567                        blocker,
2568                        automaton: application.clone(),
2569                        relay: application.clone(),
2570                        reporter: supervisor.clone(),
2571                        supervisor,
2572                        partition: validator.to_string(),
2573                        mailbox_size: 1024,
2574                        namespace: namespace.clone(),
2575                        leader_timeout: Duration::from_secs(1),
2576                        notarization_timeout: Duration::from_secs(2),
2577                        nullify_retry: Duration::from_secs(10),
2578                        fetch_timeout: Duration::from_secs(1),
2579                        activity_timeout,
2580                        skip_timeout,
2581                        max_fetch_count: 1,
2582                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2583                        fetch_concurrent: 1,
2584                        replay_buffer: NZUsize!(1024 * 1024),
2585                        write_buffer: NZUsize!(1024 * 1024),
2586                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2587                    };
2588                    let engine = Engine::new(context.with_label("engine"), cfg);
2589                    engine.start(pending, recovered, resolver);
2590                }
2591            }
2592
2593            // Wait for all engines to finish
2594            let mut finalizers = Vec::new();
2595            for supervisor in supervisors.iter_mut() {
2596                let (mut latest, mut monitor) = supervisor.subscribe().await;
2597                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2598                    while latest < required_containers {
2599                        latest = monitor.next().await.expect("event missing");
2600                    }
2601                }));
2602            }
2603            join_all(finalizers).await;
2604
2605            // Check supervisors for correct activity
2606            let mut invalid_count = 0;
2607            let byz = &validators[0];
2608            for supervisor in supervisors.iter() {
2609                // Ensure no faults
2610                {
2611                    let faults = supervisor.faults.lock().unwrap();
2612                    assert!(faults.is_empty());
2613                }
2614
2615                // Count invalid signatures
2616                {
2617                    let invalid = supervisor.invalid.lock().unwrap();
2618                    if *invalid > 0 {
2619                        invalid_count += 1;
2620                    }
2621                }
2622            }
2623            assert_eq!(invalid_count, n - 1);
2624
2625            // Ensure invalid is blocked
2626            let blocked = oracle.blocked().await.unwrap();
2627            assert!(!blocked.is_empty());
2628            for (a, b) in blocked {
2629                assert_ne!(&a, byz);
2630                assert_eq!(&b, byz);
2631            }
2632        });
2633    }
2634
2635    #[test_traced]
2636    #[ignore]
2637    fn test_invalid() {
2638        for seed in 0..5 {
2639            invalid::<MinPk>(seed);
2640            invalid::<MinSig>(seed);
2641        }
2642    }
2643
2644    fn impersonator<V: Variant>(seed: u64) {
2645        // Create context
2646        let n = 4;
2647        let threshold = quorum(n);
2648        let required_containers = 50;
2649        let activity_timeout = 10;
2650        let skip_timeout = 5;
2651        let namespace = b"consensus".to_vec();
2652        let cfg = deterministic::Config::new()
2653            .with_seed(seed)
2654            .with_timeout(Some(Duration::from_secs(30)));
2655        let executor = deterministic::Runner::new(cfg);
2656        executor.start(|mut context| async move {
2657            // Create simulated network
2658            let (network, mut oracle) = Network::new(
2659                context.with_label("network"),
2660                Config {
2661                    max_size: 1024 * 1024,
2662                },
2663            );
2664
2665            // Start network
2666            network.start();
2667
2668            // Register participants
2669            let mut schemes = Vec::new();
2670            let mut validators = Vec::new();
2671            for i in 0..n {
2672                let scheme = PrivateKey::from_seed(i as u64);
2673                let pk = scheme.public_key();
2674                schemes.push(scheme);
2675                validators.push(pk);
2676            }
2677            validators.sort();
2678            schemes.sort_by_key(|s| s.public_key());
2679            let mut registrations = register_validators(&mut oracle, &validators).await;
2680
2681            // Link all validators
2682            let link = Link {
2683                latency: Duration::from_millis(10),
2684                jitter: Duration::from_millis(1),
2685                success_rate: 1.0,
2686            };
2687            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2688
2689            // Derive threshold
2690            let (polynomial, shares) =
2691                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2692
2693            // Create engines
2694            let relay = Arc::new(mocks::relay::Relay::new());
2695            let mut supervisors = Vec::new();
2696            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2697                // Create scheme context
2698                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2699
2700                // Start engine
2701                let validator = scheme.public_key();
2702                let mut participants = BTreeMap::new();
2703                participants.insert(
2704                    0,
2705                    (
2706                        polynomial.clone(),
2707                        validators.clone(),
2708                        Some(shares[idx_scheme].clone()),
2709                    ),
2710                );
2711                let supervisor_config = mocks::supervisor::Config::<_, V> {
2712                    namespace: namespace.clone(),
2713                    participants,
2714                };
2715                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2716                let (pending, recovered, resolver) = registrations
2717                    .remove(&validator)
2718                    .expect("validator should be registered");
2719                if idx_scheme == 0 {
2720                    let cfg = mocks::impersonator::Config {
2721                        supervisor,
2722                        namespace: namespace.clone(),
2723                    };
2724
2725                    let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2726                        mocks::impersonator::Impersonator::new(
2727                            context.with_label("byzantine_engine"),
2728                            cfg,
2729                        );
2730                    engine.start(pending);
2731                } else {
2732                    supervisors.push(supervisor.clone());
2733                    let application_cfg = mocks::application::Config {
2734                        hasher: Sha256::default(),
2735                        relay: relay.clone(),
2736                        participant: validator.clone(),
2737                        propose_latency: (10.0, 5.0),
2738                        verify_latency: (10.0, 5.0),
2739                    };
2740                    let (actor, application) = mocks::application::Application::new(
2741                        context.with_label("application"),
2742                        application_cfg,
2743                    );
2744                    actor.start();
2745                    let blocker = oracle.control(scheme.public_key());
2746                    let cfg = config::Config {
2747                        crypto: scheme,
2748                        blocker,
2749                        automaton: application.clone(),
2750                        relay: application.clone(),
2751                        reporter: supervisor.clone(),
2752                        supervisor,
2753                        partition: validator.to_string(),
2754                        mailbox_size: 1024,
2755                        namespace: namespace.clone(),
2756                        leader_timeout: Duration::from_secs(1),
2757                        notarization_timeout: Duration::from_secs(2),
2758                        nullify_retry: Duration::from_secs(10),
2759                        fetch_timeout: Duration::from_secs(1),
2760                        activity_timeout,
2761                        skip_timeout,
2762                        max_fetch_count: 1,
2763                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2764                        fetch_concurrent: 1,
2765                        replay_buffer: NZUsize!(1024 * 1024),
2766                        write_buffer: NZUsize!(1024 * 1024),
2767                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2768                    };
2769                    let engine = Engine::new(context.with_label("engine"), cfg);
2770                    engine.start(pending, recovered, resolver);
2771                }
2772            }
2773
2774            // Wait for all engines to finish
2775            let mut finalizers = Vec::new();
2776            for supervisor in supervisors.iter_mut() {
2777                let (mut latest, mut monitor) = supervisor.subscribe().await;
2778                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2779                    while latest < required_containers {
2780                        latest = monitor.next().await.expect("event missing");
2781                    }
2782                }));
2783            }
2784            join_all(finalizers).await;
2785
2786            // Check supervisors for correct activity
2787            let byz = &validators[0];
2788            for supervisor in supervisors.iter() {
2789                // Ensure no faults
2790                {
2791                    let faults = supervisor.faults.lock().unwrap();
2792                    assert!(faults.is_empty());
2793                }
2794
2795                // Ensure no invalid signatures
2796                {
2797                    let invalid = supervisor.invalid.lock().unwrap();
2798                    assert_eq!(*invalid, 0);
2799                }
2800            }
2801
2802            // Ensure invalid is blocked
2803            let blocked = oracle.blocked().await.unwrap();
2804            assert!(!blocked.is_empty());
2805            for (a, b) in blocked {
2806                assert_ne!(&a, byz);
2807                assert_eq!(&b, byz);
2808            }
2809        });
2810    }
2811
2812    #[test_traced]
2813    #[ignore]
2814    fn test_impersonator() {
2815        for seed in 0..5 {
2816            impersonator::<MinPk>(seed);
2817            impersonator::<MinSig>(seed);
2818        }
2819    }
2820
2821    fn nuller<V: Variant>(seed: u64) {
2822        // Create context
2823        let n = 4;
2824        let threshold = quorum(n);
2825        let required_containers = 50;
2826        let activity_timeout = 10;
2827        let skip_timeout = 5;
2828        let namespace = b"consensus".to_vec();
2829        let cfg = deterministic::Config::new()
2830            .with_seed(seed)
2831            .with_timeout(Some(Duration::from_secs(30)));
2832        let executor = deterministic::Runner::new(cfg);
2833        executor.start(|mut context| async move {
2834            // Create simulated network
2835            let (network, mut oracle) = Network::new(
2836                context.with_label("network"),
2837                Config {
2838                    max_size: 1024 * 1024,
2839                },
2840            );
2841
2842            // Start network
2843            network.start();
2844
2845            // Register participants
2846            let mut schemes = Vec::new();
2847            let mut validators = Vec::new();
2848            for i in 0..n {
2849                let scheme = PrivateKey::from_seed(i as u64);
2850                let pk = scheme.public_key();
2851                schemes.push(scheme);
2852                validators.push(pk);
2853            }
2854            validators.sort();
2855            schemes.sort_by_key(|s| s.public_key());
2856            let mut registrations = register_validators(&mut oracle, &validators).await;
2857
2858            // Link all validators
2859            let link = Link {
2860                latency: Duration::from_millis(10),
2861                jitter: Duration::from_millis(1),
2862                success_rate: 1.0,
2863            };
2864            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2865
2866            // Derive threshold
2867            let (polynomial, shares) =
2868                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2869
2870            // Create engines
2871            let relay = Arc::new(mocks::relay::Relay::new());
2872            let mut supervisors = Vec::new();
2873            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2874                // Create scheme context
2875                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2876
2877                // Start engine
2878                let validator = scheme.public_key();
2879                let mut participants = BTreeMap::new();
2880                participants.insert(
2881                    0,
2882                    (
2883                        polynomial.clone(),
2884                        validators.clone(),
2885                        Some(shares[idx_scheme].clone()),
2886                    ),
2887                );
2888                let supervisor_config = mocks::supervisor::Config::<_, V> {
2889                    namespace: namespace.clone(),
2890                    participants,
2891                };
2892                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2893                let (pending, recovered, resolver) = registrations
2894                    .remove(&validator)
2895                    .expect("validator should be registered");
2896                if idx_scheme == 0 {
2897                    let cfg = mocks::nuller::Config {
2898                        supervisor,
2899                        namespace: namespace.clone(),
2900                    };
2901                    let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2902                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2903                    engine.start(pending);
2904                } else {
2905                    supervisors.push(supervisor.clone());
2906                    let application_cfg = mocks::application::Config {
2907                        hasher: Sha256::default(),
2908                        relay: relay.clone(),
2909                        participant: validator.clone(),
2910                        propose_latency: (10.0, 5.0),
2911                        verify_latency: (10.0, 5.0),
2912                    };
2913                    let (actor, application) = mocks::application::Application::new(
2914                        context.with_label("application"),
2915                        application_cfg,
2916                    );
2917                    actor.start();
2918                    let blocker = oracle.control(scheme.public_key());
2919                    let cfg = config::Config {
2920                        crypto: scheme,
2921                        blocker,
2922                        automaton: application.clone(),
2923                        relay: application.clone(),
2924                        reporter: supervisor.clone(),
2925                        supervisor,
2926                        partition: validator.to_string(),
2927                        mailbox_size: 1024,
2928                        namespace: namespace.clone(),
2929                        leader_timeout: Duration::from_secs(1),
2930                        notarization_timeout: Duration::from_secs(2),
2931                        nullify_retry: Duration::from_secs(10),
2932                        fetch_timeout: Duration::from_secs(1),
2933                        activity_timeout,
2934                        skip_timeout,
2935                        max_fetch_count: 1,
2936                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2937                        fetch_concurrent: 1,
2938                        replay_buffer: NZUsize!(1024 * 1024),
2939                        write_buffer: NZUsize!(1024 * 1024),
2940                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2941                    };
2942                    let engine = Engine::new(context.with_label("engine"), cfg);
2943                    engine.start(pending, recovered, resolver);
2944                }
2945            }
2946
2947            // Wait for all engines to finish
2948            let mut finalizers = Vec::new();
2949            for supervisor in supervisors.iter_mut() {
2950                let (mut latest, mut monitor) = supervisor.subscribe().await;
2951                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2952                    while latest < required_containers {
2953                        latest = monitor.next().await.expect("event missing");
2954                    }
2955                }));
2956            }
2957            join_all(finalizers).await;
2958
2959            // Check supervisors for correct activity
2960            let byz = &validators[0];
2961            let mut count_nullify_and_finalize = 0;
2962            for supervisor in supervisors.iter() {
2963                // Ensure only faults for byz
2964                {
2965                    let faults = supervisor.faults.lock().unwrap();
2966                    assert_eq!(faults.len(), 1);
2967                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2968                    for (_, faults) in faulter.iter() {
2969                        for fault in faults.iter() {
2970                            match fault {
2971                                Activity::NullifyFinalize(_) => {
2972                                    count_nullify_and_finalize += 1;
2973                                }
2974                                _ => panic!("unexpected fault: {fault:?}"),
2975                            }
2976                        }
2977                    }
2978                }
2979
2980                // Ensure no invalid signatures
2981                {
2982                    let invalid = supervisor.invalid.lock().unwrap();
2983                    assert_eq!(*invalid, 0);
2984                }
2985            }
2986            assert!(count_nullify_and_finalize > 0);
2987
2988            // Ensure nullifier is blocked
2989            let blocked = oracle.blocked().await.unwrap();
2990            assert!(!blocked.is_empty());
2991            for (a, b) in blocked {
2992                assert_ne!(&a, byz);
2993                assert_eq!(&b, byz);
2994            }
2995        });
2996    }
2997
2998    #[test_traced]
2999    #[ignore]
3000    fn test_nuller() {
3001        for seed in 0..5 {
3002            nuller::<MinPk>(seed);
3003            nuller::<MinSig>(seed);
3004        }
3005    }
3006
3007    fn outdated<V: Variant>(seed: u64) {
3008        // Create context
3009        let n = 4;
3010        let threshold = quorum(n);
3011        let required_containers = 100;
3012        let activity_timeout = 10;
3013        let skip_timeout = 5;
3014        let namespace = b"consensus".to_vec();
3015        let cfg = deterministic::Config::new()
3016            .with_seed(seed)
3017            .with_timeout(Some(Duration::from_secs(30)));
3018        let executor = deterministic::Runner::new(cfg);
3019        executor.start(|mut context| async move {
3020            // Create simulated network
3021            let (network, mut oracle) = Network::new(
3022                context.with_label("network"),
3023                Config {
3024                    max_size: 1024 * 1024,
3025                },
3026            );
3027
3028            // Start network
3029            network.start();
3030
3031            // Register participants
3032            let mut schemes = Vec::new();
3033            let mut validators = Vec::new();
3034            for i in 0..n {
3035                let scheme = PrivateKey::from_seed(i as u64);
3036                let pk = scheme.public_key();
3037                schemes.push(scheme);
3038                validators.push(pk);
3039            }
3040            validators.sort();
3041            schemes.sort_by_key(|s| s.public_key());
3042            let mut registrations = register_validators(&mut oracle, &validators).await;
3043
3044            // Link all validators
3045            let link = Link {
3046                latency: Duration::from_millis(10),
3047                jitter: Duration::from_millis(1),
3048                success_rate: 1.0,
3049            };
3050            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3051
3052            // Derive threshold
3053            let (polynomial, shares) =
3054                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3055
3056            // Create engines
3057            let relay = Arc::new(mocks::relay::Relay::new());
3058            let mut supervisors = Vec::new();
3059            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
3060                // Create scheme context
3061                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3062
3063                // Start engine
3064                let validator = scheme.public_key();
3065                let mut participants = BTreeMap::new();
3066                participants.insert(
3067                    0,
3068                    (
3069                        polynomial.clone(),
3070                        validators.clone(),
3071                        Some(shares[idx_scheme].clone()),
3072                    ),
3073                );
3074                let supervisor_config = mocks::supervisor::Config::<_, V> {
3075                    namespace: namespace.clone(),
3076                    participants,
3077                };
3078                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3079                let (pending, recovered, resolver) = registrations
3080                    .remove(&validator)
3081                    .expect("validator should be registered");
3082                if idx_scheme == 0 {
3083                    let cfg = mocks::outdated::Config {
3084                        supervisor,
3085                        namespace: namespace.clone(),
3086                        view_delta: activity_timeout * 4,
3087                    };
3088                    let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
3089                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3090                    engine.start(pending);
3091                } else {
3092                    supervisors.push(supervisor.clone());
3093                    let application_cfg = mocks::application::Config {
3094                        hasher: Sha256::default(),
3095                        relay: relay.clone(),
3096                        participant: validator.clone(),
3097                        propose_latency: (10.0, 5.0),
3098                        verify_latency: (10.0, 5.0),
3099                    };
3100                    let (actor, application) = mocks::application::Application::new(
3101                        context.with_label("application"),
3102                        application_cfg,
3103                    );
3104                    actor.start();
3105                    let blocker = oracle.control(scheme.public_key());
3106                    let cfg = config::Config {
3107                        crypto: scheme,
3108                        blocker,
3109                        automaton: application.clone(),
3110                        relay: application.clone(),
3111                        reporter: supervisor.clone(),
3112                        supervisor,
3113                        partition: validator.to_string(),
3114                        mailbox_size: 1024,
3115                        namespace: namespace.clone(),
3116                        leader_timeout: Duration::from_secs(1),
3117                        notarization_timeout: Duration::from_secs(2),
3118                        nullify_retry: Duration::from_secs(10),
3119                        fetch_timeout: Duration::from_secs(1),
3120                        activity_timeout,
3121                        skip_timeout,
3122                        max_fetch_count: 1,
3123                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3124                        fetch_concurrent: 1,
3125                        replay_buffer: NZUsize!(1024 * 1024),
3126                        write_buffer: NZUsize!(1024 * 1024),
3127                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3128                    };
3129                    let engine = Engine::new(context.with_label("engine"), cfg);
3130                    engine.start(pending, recovered, resolver);
3131                }
3132            }
3133
3134            // Wait for all engines to finish
3135            let mut finalizers = Vec::new();
3136            for supervisor in supervisors.iter_mut() {
3137                let (mut latest, mut monitor) = supervisor.subscribe().await;
3138                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3139                    while latest < required_containers {
3140                        latest = monitor.next().await.expect("event missing");
3141                    }
3142                }));
3143            }
3144            join_all(finalizers).await;
3145
3146            // Check supervisors for correct activity
3147            for supervisor in supervisors.iter() {
3148                // Ensure no faults
3149                {
3150                    let faults = supervisor.faults.lock().unwrap();
3151                    assert!(faults.is_empty());
3152                }
3153
3154                // Ensure no invalid signatures
3155                {
3156                    let invalid = supervisor.invalid.lock().unwrap();
3157                    assert_eq!(*invalid, 0);
3158                }
3159            }
3160
3161            // Ensure no blocked connections
3162            let blocked = oracle.blocked().await.unwrap();
3163            assert!(blocked.is_empty());
3164        });
3165    }
3166
3167    #[test_traced]
3168    #[ignore]
3169    fn test_outdated() {
3170        for seed in 0..5 {
3171            outdated::<MinPk>(seed);
3172            outdated::<MinSig>(seed);
3173        }
3174    }
3175
3176    fn run_1k<V: Variant>() {
3177        // Create context
3178        let n = 10;
3179        let threshold = quorum(n);
3180        let required_containers = 1_000;
3181        let activity_timeout = 10;
3182        let skip_timeout = 5;
3183        let namespace = b"consensus".to_vec();
3184        let cfg = deterministic::Config::new();
3185        let executor = deterministic::Runner::new(cfg);
3186        executor.start(|mut context| async move {
3187            // Create simulated network
3188            let (network, mut oracle) = Network::new(
3189                context.with_label("network"),
3190                Config {
3191                    max_size: 1024 * 1024,
3192                },
3193            );
3194
3195            // Start network
3196            network.start();
3197
3198            // Register participants
3199            let mut schemes = Vec::new();
3200            let mut validators = Vec::new();
3201            for i in 0..n {
3202                let scheme = PrivateKey::from_seed(i as u64);
3203                let pk = scheme.public_key();
3204                schemes.push(scheme);
3205                validators.push(pk);
3206            }
3207            validators.sort();
3208            schemes.sort_by_key(|s| s.public_key());
3209            let mut registrations = register_validators(&mut oracle, &validators).await;
3210
3211            // Link all validators
3212            let link = Link {
3213                latency: Duration::from_millis(80),
3214                jitter: Duration::from_millis(10),
3215                success_rate: 0.98,
3216            };
3217            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3218
3219            // Derive threshold
3220            let (polynomial, shares) =
3221                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3222
3223            // Create engines
3224            let relay = Arc::new(mocks::relay::Relay::new());
3225            let mut supervisors = Vec::new();
3226            let mut engine_handlers = Vec::new();
3227            for (idx, scheme) in schemes.into_iter().enumerate() {
3228                // Create scheme context
3229                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3230
3231                // Configure engine
3232                let validator = scheme.public_key();
3233                let mut participants = BTreeMap::new();
3234                participants.insert(
3235                    0,
3236                    (
3237                        polynomial.clone(),
3238                        validators.clone(),
3239                        Some(shares[idx].clone()),
3240                    ),
3241                );
3242                let supervisor_config = mocks::supervisor::Config::<_, V> {
3243                    namespace: namespace.clone(),
3244                    participants,
3245                };
3246                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3247                supervisors.push(supervisor.clone());
3248                let application_cfg = mocks::application::Config {
3249                    hasher: Sha256::default(),
3250                    relay: relay.clone(),
3251                    participant: validator.clone(),
3252                    propose_latency: (100.0, 50.0),
3253                    verify_latency: (50.0, 40.0),
3254                };
3255                let (actor, application) = mocks::application::Application::new(
3256                    context.with_label("application"),
3257                    application_cfg,
3258                );
3259                actor.start();
3260                let blocker = oracle.control(scheme.public_key());
3261                let cfg = config::Config {
3262                    crypto: scheme,
3263                    blocker,
3264                    automaton: application.clone(),
3265                    relay: application.clone(),
3266                    reporter: supervisor.clone(),
3267                    supervisor,
3268                    partition: validator.to_string(),
3269                    mailbox_size: 1024,
3270                    namespace: namespace.clone(),
3271                    leader_timeout: Duration::from_secs(1),
3272                    notarization_timeout: Duration::from_secs(2),
3273                    nullify_retry: Duration::from_secs(10),
3274                    fetch_timeout: Duration::from_secs(1),
3275                    activity_timeout,
3276                    skip_timeout,
3277                    max_fetch_count: 1,
3278                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3279                    fetch_concurrent: 1,
3280                    replay_buffer: NZUsize!(1024 * 1024),
3281                    write_buffer: NZUsize!(1024 * 1024),
3282                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3283                };
3284                let engine = Engine::new(context.with_label("engine"), cfg);
3285
3286                // Start engine
3287                let (pending, recovered, resolver) = registrations
3288                    .remove(&validator)
3289                    .expect("validator should be registered");
3290                engine_handlers.push(engine.start(pending, recovered, resolver));
3291            }
3292
3293            // Wait for all engines to finish
3294            let mut finalizers = Vec::new();
3295            for supervisor in supervisors.iter_mut() {
3296                let (mut latest, mut monitor) = supervisor.subscribe().await;
3297                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3298                    while latest < required_containers {
3299                        latest = monitor.next().await.expect("event missing");
3300                    }
3301                }));
3302            }
3303            join_all(finalizers).await;
3304
3305            // Check supervisors for correct activity
3306            for supervisor in supervisors.iter() {
3307                // Ensure no faults
3308                {
3309                    let faults = supervisor.faults.lock().unwrap();
3310                    assert!(faults.is_empty());
3311                }
3312
3313                // Ensure no invalid signatures
3314                {
3315                    let invalid = supervisor.invalid.lock().unwrap();
3316                    assert_eq!(*invalid, 0);
3317                }
3318            }
3319
3320            // Ensure no blocked connections
3321            let blocked = oracle.blocked().await.unwrap();
3322            assert!(blocked.is_empty());
3323        })
3324    }
3325
3326    #[test_traced]
3327    #[ignore]
3328    fn test_1k() {
3329        run_1k::<MinPk>();
3330        run_1k::<MinSig>();
3331    }
3332
3333    fn tle<V: Variant>() {
3334        // Create context
3335        let n = 4;
3336        let threshold = quorum(n);
3337        let namespace = b"consensus".to_vec();
3338        let activity_timeout = 100;
3339        let skip_timeout = 50;
3340        let executor = deterministic::Runner::timed(Duration::from_secs(30));
3341        executor.start(|mut context| async move {
3342            // Create simulated network
3343            let (network, mut oracle) = Network::new(
3344                context.with_label("network"),
3345                Config {
3346                    max_size: 1024 * 1024,
3347                },
3348            );
3349
3350            // Start network
3351            network.start();
3352
3353            // Register participants
3354            let mut schemes = Vec::new();
3355            let mut validators = Vec::new();
3356            for i in 0..n {
3357                let scheme = PrivateKey::from_seed(i as u64);
3358                let pk = scheme.public_key();
3359                schemes.push(scheme);
3360                validators.push(pk);
3361            }
3362            validators.sort();
3363            schemes.sort_by_key(|s| s.public_key());
3364            let mut registrations = register_validators(&mut oracle, &validators).await;
3365
3366            // Link all validators
3367            let link = Link {
3368                latency: Duration::from_millis(10),
3369                jitter: Duration::from_millis(5),
3370                success_rate: 1.0,
3371            };
3372            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3373
3374            // Derive threshold
3375            let (polynomial, shares) =
3376                ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3377            let public_key = *public::<V>(&polynomial);
3378
3379            // Create engines and supervisors
3380            let relay = Arc::new(mocks::relay::Relay::new());
3381            let mut supervisors = Vec::new();
3382            let mut engine_handlers = Vec::new();
3383            let monitor_supervisor = Arc::new(Mutex::new(None));
3384            for (idx, scheme) in schemes.into_iter().enumerate() {
3385                // Create scheme context
3386                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3387
3388                // Configure engine
3389                let validator = scheme.public_key();
3390                let mut participants = BTreeMap::new();
3391                participants.insert(
3392                    0,
3393                    (
3394                        polynomial.clone(),
3395                        validators.clone(),
3396                        Some(shares[idx].clone()),
3397                    ),
3398                );
3399
3400                // Store first supervisor for monitoring
3401                let supervisor_config = mocks::supervisor::Config::<_, V> {
3402                    namespace: namespace.clone(),
3403                    participants,
3404                };
3405                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3406                supervisors.push(supervisor.clone());
3407                if idx == 0 {
3408                    *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3409                }
3410
3411                // Configure application
3412                let application_cfg = mocks::application::Config {
3413                    hasher: Sha256::default(),
3414                    relay: relay.clone(),
3415                    participant: validator.clone(),
3416                    propose_latency: (10.0, 5.0),
3417                    verify_latency: (10.0, 5.0),
3418                };
3419                let (actor, application) = mocks::application::Application::new(
3420                    context.with_label("application"),
3421                    application_cfg,
3422                );
3423                actor.start();
3424                let blocker = oracle.control(scheme.public_key());
3425                let cfg = config::Config {
3426                    crypto: scheme,
3427                    blocker,
3428                    automaton: application.clone(),
3429                    relay: application.clone(),
3430                    reporter: supervisor.clone(),
3431                    supervisor,
3432                    partition: validator.to_string(),
3433                    mailbox_size: 1024,
3434                    namespace: namespace.clone(),
3435                    leader_timeout: Duration::from_millis(100),
3436                    notarization_timeout: Duration::from_millis(200),
3437                    nullify_retry: Duration::from_millis(500),
3438                    fetch_timeout: Duration::from_millis(100),
3439                    activity_timeout,
3440                    skip_timeout,
3441                    max_fetch_count: 1,
3442                    fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3443                    fetch_concurrent: 1,
3444                    replay_buffer: NZUsize!(1024 * 1024),
3445                    write_buffer: NZUsize!(1024 * 1024),
3446                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3447                };
3448                let engine = Engine::new(context.with_label("engine"), cfg);
3449
3450                // Start engine
3451                let (pending, recovered, resolver) = registrations
3452                    .remove(&validator)
3453                    .expect("validator should be registered");
3454                engine_handlers.push(engine.start(pending, recovered, resolver));
3455            }
3456
3457            // Prepare TLE test data
3458            let target = 10u64; // Encrypt for view 10
3459            let target_bytes = target.to_be_bytes();
3460            let message_content = b"Secret message for future view10"; // 32 bytes
3461            let message = Block::new(*message_content);
3462
3463            // Encrypt message for future view using threshold public key
3464            let seed_namespace = seed_namespace(&namespace);
3465            let ciphertext = encrypt::<_, V>(
3466                &mut context,
3467                public_key,
3468                (Some(&seed_namespace), &target_bytes),
3469                &message,
3470            );
3471
3472            // Wait for consensus to reach the target view and then decrypt
3473            let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3474            loop {
3475                // Wait for notarization
3476                context.sleep(Duration::from_millis(100)).await;
3477                let notarizations = supervisor.notarizations.lock().unwrap();
3478                let Some(notarization) = notarizations.get(&target) else {
3479                    continue;
3480                };
3481
3482                // Decrypt the message using the seed signature
3483                let seed_signature = notarization.seed_signature;
3484                let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3485                    .expect("Decryption should succeed with valid seed signature");
3486                assert_eq!(
3487                    message.as_ref(),
3488                    decrypted.as_ref(),
3489                    "Decrypted message should match original message"
3490                );
3491                break;
3492            }
3493        });
3494    }
3495
3496    #[test_traced]
3497    fn test_tle() {
3498        tle::<MinPk>();
3499        tle::<MinSig>();
3500    }
3501}