commonware_consensus/threshold_simplex/
mod.rs

1//! [Simplex](crate::simplex)-like BFT agreement with an embedded VRF and succinct consensus certificates.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `threshold-simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal finalization
5//! latency in a partially synchronous setting. Unlike Simplex Consensus, however, `threshold-simplex` employs threshold
6//! cryptography (specifically BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum) to generate both
7//! a bias-resistant beacon (for leader election and post-facto execution randomness) and succinct consensus certificates
8//! (any certificate can be verified with just the static public key of the consensus instance) for each view
9//! with zero message overhead (natively integrated).
10//!
11//! _If you wish to deploy Simplex Consensus but can't employ threshold signatures, see
12//! [Simplex](crate::simplex)._
13//!
14//! # Features
15//!
16//! * Wicked Fast Block Times (2 Network Hops)
17//! * Optimal Finalization Latency (3 Network Hops)
18//! * Externalized Uptime and Fault Proofs
19//! * Decoupled Block Broadcast and Sync
20//! * Flexible Block Format
21//! * Embedded VRF for Leader Election and Post-Facto Execution Randomness
22//! * Succinct Consensus Certificates for Notarization, Nullification, and Finality
23//!
24//! # Design
25//!
26//! ## Architecture
27//!
28//! All logic is split into two components: the `Voter` and the `Resolver` (and the user of `threshold-simplex`
29//! provides `Application`). The `Voter` is responsible for participating in the latest view and the
30//! `Resolver` is responsible for fetching artifacts from previous views required to verify proposed
31//! blocks in the latest view.
32//!
33//! To provide great performance, all interactions between `Voter`, `Resolver`, and `Application` are
34//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
35//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
36//!
37//! ```txt
38//! +---------------+           +---------+            +++++++++++++++
39//! |               |<----------+         +----------->+             +
40//! |  Application  |           |  Voter  |            +    Peers    +
41//! |               +---------->|         |<-----------+             +
42//! +---------------+           +--+------+            +++++++++++++++
43//!                                |   ^
44//!                                |   |
45//!                                |   |
46//!                                |   |
47//!                                v   |
48//!                            +-------+----+          +++++++++++++++
49//!                            |            +--------->+             +
50//!                            |  Resolver  |          +    Peers    +
51//!                            |            |<---------+             +
52//!                            +------------+          +++++++++++++++
53//! ```
54//!
55//! _Application is usually a single object that implements the `Automaton`, `Relay`, `Committer`,
56//! and `ThresholdSupervisor` traits._
57//!
58//! ## Joining Consensus
59//!
60//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
61//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
62//! latest view and begin participating in consensus (assuming it can verify blocks).
63//!
64//! ## Persistence
65//!
66//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
67//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
68//! consensus and messages it generates to a write-ahead log (WAL) implemented by [`Journal`](https://docs.rs/commonware-storage/latest/commonware_storage/journal/index.html).
69//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
70//! on restart (especially in the case of unclean shutdown).
71//!
72//! ## Protocol Description
73//!
74//! ### Specification for View `v`
75//!
76//! Upon entering view `v`:
77//! * Determine leader `l` for view `v`
78//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
79//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
80//! * If leader `l`, broadcast `(part(v), notarize(c,v))`
81//!   * If can't propose container in view `v` because missing notarization/nullification for a
82//!     previous view `v_m`, request `v_m`
83//!
84//! Upon receiving first `(part(v), notarize(c,v))` from `l`:
85//! * Cancel `t_l`
86//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
87//!   between `v` and `v_parent`, verify `c` and broadcast `(part(v), notarize(c,v))`
88//!
89//! Upon receiving `2f+1` `(part(v), notarize(c,v))`:
90//! * Cancel `t_a`
91//! * Mark `c` as notarized
92//! * Broadcast `(seed(v), notarization(c,v))` (even if we have not verified `c`)
93//! * If have not broadcast `(part(v), nullify(v))`, broadcast `finalize(c,v)`
94//! * Enter `v+1`
95//!
96//! Upon receiving `2f+1` `(part(v), nullify(v))`:
97//! * Broadcast `(seed(v), nullification(v))`
98//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
99//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
100//!       instead.
101//! * Enter `v+1`
102//!
103//! Upon receiving `2f+1` `finalize(c,v)`:
104//! * Mark `c` as finalized (and recursively finalize its parents)
105//! * Broadcast `(seed(v), finalization(c,v))` (even if we have not verified `c`)
106//!
107//! Upon `t_l` or `t_a` firing:
108//! * Broadcast `(part(v), nullify(v))`
109//! * Every `t_r` after `(part(v), nullify(v))` broadcast that we are still in view `v`:
110//!    * Rebroadcast `(part(v), nullify(v))` and either `(seed(v-1), notarization(v-1))` or `(seed(v-1), nullification(v-1))`
111//!
112//! #### Embedded VRF
113//!
114//! When broadcasting any `notarize(c,v)` or `nullify(v)` message, a participant must also include a `part(v)` message (a partial
115//! signature over the view `v`). After `2f+1` `notarize(c,v)` or `nullify(v)` messages are collected from unique participants,
116//! `seed(v)` can be recovered. Because `part(v)` is only over the view `v`, the seed derived for a given view `v` is the same regardless of
117//! whether or not a block was notarized in said view `v`.
118//!
119//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
120//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
121//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
122//! is used as a seed in `v`).
123//!
124//! #### Succinct Consensus Certificates
125//!
126//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain partial signatures for a static
127//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
128//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
129//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
130//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
131//! to operate a lite client).
132//!
133//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
134//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
135//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
136//!
137//! ### Deviations from Simplex Consensus
138//!
139//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
140//!   a set of all notarizations/nullifications for all historical blocks.
141//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
142//!   either a "block" or a "dummy block", respectively.
143//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
144//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
145//!   some number of views (again to trigger early view transition for an unresponsive leader).
146//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
147//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
148
149use commonware_utils::Array;
150
151mod encoder;
152mod prover;
153pub use prover::Prover;
154mod wire {
155    include!(concat!(env!("OUT_DIR"), "/threshold_simplex.wire.rs"));
156}
157
158cfg_if::cfg_if! {
159    if #[cfg(not(target_arch = "wasm32"))] {
160        mod actors;
161        mod config;
162        pub use config::Config;
163        mod engine;
164        pub use engine::Engine;
165        mod metrics;
166        mod verifier;
167    }
168}
169
170#[cfg(test)]
171pub mod mocks;
172
173/// View is a monotonically increasing counter that represents the current focus of consensus.
174pub type View = u64;
175
176use crate::Activity;
177
178/// Context is a collection of metadata from consensus about a given payload.
179#[derive(Clone)]
180pub struct Context<D: Array> {
181    /// Current view of consensus.
182    pub view: View,
183
184    /// Parent the payload is built on.
185    ///
186    /// If there is a gap between the current view and the parent view, the participant
187    /// must possess a nullification for each discarded view to safely vote on the proposed
188    /// payload (any view without a nullification may eventually be finalized and skipping
189    /// it would result in a fork).
190    pub parent: (View, D),
191}
192
193/// Notarize a payload at a given view.
194///
195/// ## Clarifications
196/// * Vote for leader is considered a proposal and a vote.
197/// * It is ok to have both a vote for a proposal and the null
198///   container in the same view.
199/// * It is ok to notarize/finalize different proposals in the same view.
200pub const NOTARIZE: Activity = 0;
201/// Finalize a payload at a given view.
202pub const FINALIZE: Activity = 1;
203/// Notarize a payload that conflicts with a previous notarize.
204pub const CONFLICTING_NOTARIZE: Activity = 2;
205/// Finalize a payload that conflicts with a previous finalize.
206pub const CONFLICTING_FINALIZE: Activity = 3;
207/// Nullify and finalize in the same view.
208pub const NULLIFY_AND_FINALIZE: Activity = 4;
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use commonware_cryptography::{
214        bls12381::{dkg::ops, primitives::poly},
215        Ed25519, Scheme, Sha256,
216    };
217    use commonware_macros::{select, test_traced};
218    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
219    use commonware_runtime::{
220        deterministic::{self, Executor},
221        Clock, Metrics, Runner, Spawner,
222    };
223    use commonware_storage::journal::variable::{Config as JConfig, Journal};
224    use commonware_utils::quorum;
225    use engine::Engine;
226    use futures::{channel::mpsc, StreamExt};
227    use governor::Quota;
228    use rand::{rngs::StdRng, Rng, SeedableRng};
229    use std::{
230        collections::{BTreeMap, HashMap, HashSet},
231        num::NonZeroU32,
232        sync::{Arc, Mutex},
233        time::Duration,
234    };
235    use tracing::debug;
236
237    /// Registers all validators using the oracle.
238    async fn register_validators<P: Array>(
239        oracle: &mut Oracle<P>,
240        validators: &[P],
241    ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
242        let mut registrations = HashMap::new();
243        for validator in validators.iter() {
244            let (voter_sender, voter_receiver) =
245                oracle.register(validator.clone(), 0).await.unwrap();
246            let (resolver_sender, resolver_receiver) =
247                oracle.register(validator.clone(), 1).await.unwrap();
248            registrations.insert(
249                validator.clone(),
250                (
251                    (voter_sender, voter_receiver),
252                    (resolver_sender, resolver_receiver),
253                ),
254            );
255        }
256        registrations
257    }
258
259    /// Enum to describe the action to take when linking validators.
260    enum Action {
261        Link(Link),
262        Update(Link), // Unlink and then link
263        Unlink,
264    }
265
266    /// Links (or unlinks) validators using the oracle.
267    ///
268    /// The `action` parameter determines the action (e.g. link, unlink) to take.
269    /// The `restrict_to` function can be used to restrict the linking to certain connections,
270    /// otherwise all validators will be linked to all other validators.
271    async fn link_validators<P: Array>(
272        oracle: &mut Oracle<P>,
273        validators: &[P],
274        action: Action,
275        restrict_to: Option<fn(usize, usize, usize) -> bool>,
276    ) {
277        for (i1, v1) in validators.iter().enumerate() {
278            for (i2, v2) in validators.iter().enumerate() {
279                // Ignore self
280                if v2 == v1 {
281                    continue;
282                }
283
284                // Restrict to certain connections
285                if let Some(f) = restrict_to {
286                    if !f(validators.len(), i1, i2) {
287                        continue;
288                    }
289                }
290
291                // Do any unlinking first
292                match action {
293                    Action::Update(_) | Action::Unlink => {
294                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
295                    }
296                    _ => {}
297                }
298
299                // Do any linking after
300                match action {
301                    Action::Link(ref link) | Action::Update(ref link) => {
302                        oracle
303                            .add_link(v1.clone(), v2.clone(), link.clone())
304                            .await
305                            .unwrap();
306                    }
307                    _ => {}
308                }
309            }
310        }
311    }
312
313    #[test_traced]
314    fn test_all_online() {
315        // Create context
316        let n = 5;
317        let threshold = quorum(n).expect("unable to calculate threshold");
318        let max_exceptions = 4;
319        let required_containers = 100;
320        let activity_timeout = 10;
321        let namespace = b"consensus".to_vec();
322        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
323        executor.start(async move {
324            // Create simulated network
325            let (network, mut oracle) = Network::new(
326                context.with_label("network"),
327                Config {
328                    max_size: 1024 * 1024,
329                },
330            );
331
332            // Start network
333            network.start();
334
335            // Register participants
336            let mut schemes = Vec::new();
337            let mut validators = Vec::new();
338            for i in 0..n {
339                let scheme = Ed25519::from_seed(i as u64);
340                let pk = scheme.public_key();
341                schemes.push(scheme);
342                validators.push(pk);
343            }
344            validators.sort();
345            schemes.sort_by_key(|s| s.public_key());
346            let mut registrations = register_validators(&mut oracle, &validators).await;
347
348            // Link all validators
349            let link = Link {
350                latency: 10.0,
351                jitter: 1.0,
352                success_rate: 1.0,
353            };
354            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
355
356            // Derive threshold
357            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
358            let pk = poly::public(&public);
359            let prover = Prover::new(pk, &namespace);
360
361            // Create engines
362            let relay = Arc::new(mocks::relay::Relay::new());
363            let mut supervisors = Vec::new();
364            let (done_sender, mut done_receiver) = mpsc::unbounded();
365            let mut engine_handlers = Vec::new();
366            for (idx, scheme) in schemes.into_iter().enumerate() {
367                // Create scheme context
368                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
369
370                // Configure engine
371                let validator = scheme.public_key();
372                let mut participants = BTreeMap::new();
373                participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
374                let supervisor_config = mocks::supervisor::Config {
375                    prover: prover.clone(),
376                    participants,
377                };
378                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
379                supervisors.push(supervisor.clone());
380                let application_cfg = mocks::application::Config {
381                    hasher: Sha256::default(),
382                    relay: relay.clone(),
383                    participant: validator.clone(),
384                    tracker: done_sender.clone(),
385                    propose_latency: (10.0, 5.0),
386                    verify_latency: (10.0, 5.0),
387                };
388                let (actor, application) = mocks::application::Application::new(
389                    context.with_label("application"),
390                    application_cfg,
391                );
392                actor.start();
393                let cfg = JConfig {
394                    partition: validator.to_string(),
395                };
396                let journal = Journal::init(context.with_label("journal"), cfg)
397                    .await
398                    .expect("unable to create journal");
399                let cfg = config::Config {
400                    crypto: scheme,
401                    automaton: application.clone(),
402                    relay: application.clone(),
403                    committer: application,
404                    supervisor,
405                    mailbox_size: 1024,
406                    namespace: namespace.clone(),
407                    leader_timeout: Duration::from_secs(1),
408                    notarization_timeout: Duration::from_secs(2),
409                    nullify_retry: Duration::from_secs(10),
410                    fetch_timeout: Duration::from_secs(1),
411                    activity_timeout,
412                    max_fetch_count: 1,
413                    max_fetch_size: 1024 * 512,
414                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
415                    fetch_concurrent: 1,
416                    replay_concurrency: 1,
417                };
418                let engine = Engine::new(context.with_label("engine"), journal, cfg);
419
420                // Start engine
421                let (voter, resolver) = registrations
422                    .remove(&validator)
423                    .expect("validator should be registered");
424                engine_handlers.push(engine.start(voter, resolver));
425            }
426
427            // Wait for all engines to finish
428            let mut completed = HashSet::new();
429            let mut finalized = HashMap::new();
430            loop {
431                let (validator, event) = done_receiver.next().await.unwrap();
432                if let mocks::application::Progress::Finalized(proof, digest) = event {
433                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
434                    if digest != payload {
435                        panic!(
436                            "finalization mismatch digest: {:?}, payload: {:?}",
437                            digest, payload
438                        );
439                    }
440                    if let Some(previous) = finalized.insert(view, digest.clone()) {
441                        if previous != digest {
442                            panic!(
443                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
444                                view, previous, digest
445                            );
446                        }
447                    }
448                    if (finalized.len() as u64) < required_containers {
449                        continue;
450                    }
451                    completed.insert(validator);
452                }
453                if completed.len() == n as usize {
454                    break;
455                }
456            }
457
458            // Check supervisors for correct activity
459            let latest_complete = required_containers - activity_timeout;
460            for supervisor in supervisors.iter() {
461                // Ensure no faults
462                {
463                    let faults = supervisor.faults.lock().unwrap();
464                    assert!(faults.is_empty());
465                }
466
467                // Ensure no forks
468                let mut exceptions = 0;
469                {
470                    let notarizes = supervisor.notarizes.lock().unwrap();
471                    for (view, payloads) in notarizes.iter() {
472                        // Ensure only one payload proposed per view
473                        if payloads.len() > 1 {
474                            panic!("view: {}", view);
475                        }
476
477                        // Only check at views below timeout
478                        if *view > latest_complete {
479                            continue;
480                        }
481
482                        // Ensure everyone participating
483                        let digest = finalized.get(view).expect("view should be finalized");
484                        let voters = payloads.get(digest).expect("digest should exist");
485                        if voters.len() < threshold as usize {
486                            // We can't verify that everyone participated at every view because some nodes may
487                            // have started later.
488                            panic!("view: {}", view);
489                        }
490                        if voters.len() != n as usize {
491                            exceptions += 1;
492                        }
493                    }
494                }
495                {
496                    let finalizes = supervisor.finalizes.lock().unwrap();
497                    for (view, payloads) in finalizes.iter() {
498                        // Ensure only one payload proposed per view
499                        if payloads.len() > 1 {
500                            panic!("view: {}", view);
501                        }
502
503                        // Only check at views below timeout
504                        if *view > latest_complete {
505                            continue;
506                        }
507
508                        // Ensure everyone participating
509                        let digest = finalized.get(view).expect("view should be finalized");
510                        let finalizers = payloads.get(digest).expect("digest should exist");
511                        if finalizers.len() < threshold as usize {
512                            // We can't verify that everyone participated at every view because some nodes may
513                            // have started later.
514                            panic!("view: {}", view);
515                        }
516                        if finalizers.len() != n as usize {
517                            exceptions += 1;
518                        }
519                    }
520                }
521
522                // Ensure exceptions within allowed
523                assert!(exceptions <= max_exceptions);
524            }
525        });
526    }
527
528    #[test_traced]
529    fn test_unclean_shutdown() {
530        // Create context
531        let n = 5;
532        let threshold = quorum(n).expect("unable to calculate threshold");
533        let required_containers = 100;
534        let activity_timeout = 10;
535        let namespace = b"consensus".to_vec();
536
537        // Derive threshold
538        let mut rng = StdRng::seed_from_u64(0);
539        let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
540        let pk = poly::public(&public);
541
542        // Random restarts every x seconds
543        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
544        let notarized = Arc::new(Mutex::new(HashMap::new()));
545        let finalized = Arc::new(Mutex::new(HashMap::new()));
546        let completed = Arc::new(Mutex::new(HashSet::new()));
547        let supervised = Arc::new(Mutex::new(Vec::new()));
548        let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(300));
549        while completed.lock().unwrap().len() != n as usize {
550            let namespace = namespace.clone();
551            let shutdowns = shutdowns.clone();
552            let notarized = notarized.clone();
553            let finalized = finalized.clone();
554            let completed = completed.clone();
555            let supervised = supervised.clone();
556            executor.start({
557                let mut context = context.clone();
558                let public = public.clone();
559                let shares = shares.clone();
560                async move {
561                // Create simulated network
562                let (network, mut oracle) = Network::new(
563                    context.with_label("network"),
564                    Config {
565                        max_size: 1024 * 1024,
566                    },
567                );
568
569                // Start network
570                network.start();
571
572                // Register participants
573                let mut schemes = Vec::new();
574                let mut validators = Vec::new();
575                for i in 0..n {
576                    let scheme = Ed25519::from_seed(i as u64);
577                    let pk = scheme.public_key();
578                    schemes.push(scheme);
579                    validators.push(pk);
580                }
581                validators.sort();
582                schemes.sort_by_key(|s| s.public_key());
583                let mut registrations = register_validators(&mut oracle, &validators).await;
584
585                // Link all validators
586                let link = Link {
587                    latency: 50.0,
588                    jitter: 50.0,
589                    success_rate: 1.0,
590                };
591                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
592
593                // Create engines
594                let prover = Prover::new(pk, &namespace);
595                let relay = Arc::new(mocks::relay::Relay::new());
596                let mut supervisors = HashMap::new();
597                let (done_sender, mut done_receiver) = mpsc::unbounded();
598                let mut engine_handlers = Vec::new();
599                for (idx, scheme) in schemes.into_iter().enumerate() {
600                    // Create scheme context
601                    let context = context
602                        .clone()
603                        .with_label(&format!("validator-{}", scheme.public_key()));
604
605                    // Configure engine
606                    let validator = scheme.public_key();
607                    let mut participants = BTreeMap::new();
608                    participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
609                    let supervisor_config = mocks::supervisor::Config {
610                        prover: prover.clone(),
611                        participants,
612                    };
613                    let supervisor =
614                        mocks::supervisor::Supervisor::new(supervisor_config);
615                    supervisors.insert(validator.clone(), supervisor.clone());
616                    let application_cfg = mocks::application::Config {
617                        hasher: Sha256::default(),
618                        relay: relay.clone(),
619                        participant: validator.clone(),
620                        tracker: done_sender.clone(),
621                        propose_latency: (10.0, 5.0),
622                        verify_latency: (10.0, 5.0),
623                    };
624                    let (actor, application) =
625                        mocks::application::Application::new(context.with_label("application"), application_cfg);
626                    actor.start();
627                    let cfg = JConfig {
628                        partition: validator.to_string(),
629                    };
630                    let journal = Journal::init(context.with_label("journal"), cfg)
631                        .await
632                        .expect("unable to create journal");
633                    let cfg = config::Config {
634                        crypto: scheme,
635                        automaton: application.clone(),
636                        relay: application.clone(),
637                        committer: application,
638                        supervisor,
639                        mailbox_size: 1024,
640                        namespace: namespace.clone(),
641                        leader_timeout: Duration::from_secs(1),
642                        notarization_timeout: Duration::from_secs(2),
643                        nullify_retry: Duration::from_secs(10),
644                        fetch_timeout: Duration::from_secs(1),
645                        activity_timeout,
646                        max_fetch_count: 1,
647                        max_fetch_size: 1024 * 512,
648                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
649                        fetch_concurrent: 1,
650                        replay_concurrency: 1,
651                    };
652                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
653
654                    // Start engine
655                    let (voter, resolver) = registrations
656                        .remove(&validator)
657                        .expect("validator should be registered");
658                    engine_handlers.push(engine.start(voter, resolver));
659                }
660
661                // Wait for all engines to finish
662                context.with_label("confirmed").spawn(move |_| async move {
663                    loop {
664                        // Parse events
665                        let (validator, event) = done_receiver.next().await.unwrap();
666                        match event {
667                            mocks::application::Progress::Notarized(proof, digest) => {
668                                // Check correctness of proof
669                                let (view, _, payload, _, _) =
670                                    prover.deserialize_notarization(proof).unwrap();
671                                if digest != payload {
672                                    panic!(
673                                        "notarization mismatch digest: {:?}, payload: {:?}",
674                                        digest, payload
675                                    );
676                                }
677
678                                // Store notarized
679                                {
680                                    let mut notarized = notarized.lock().unwrap();
681                                    if let Some(previous) = notarized.insert(view, digest.clone())
682                                    {
683                                        if previous != digest {
684                                            panic!(
685                                                "notarization mismatch at {:?} previous: {:?}, current: {:?}",
686                                                view, previous, digest
687                                            );
688                                        }
689                                    }
690                                    if (notarized.len() as u64) < required_containers {
691                                        continue;
692                                    }
693                                }
694                            }
695                            mocks::application::Progress::Finalized(proof, digest) => {
696                                // Check correctness of proof
697                                let (view, _, payload, _, _) =
698                                    prover.deserialize_finalization(proof).unwrap();
699                                if digest != payload {
700                                    panic!(
701                                        "finalization mismatch digest: {:?}, payload: {:?}",
702                                        digest, payload
703                                    );
704                                }
705
706                                // Store finalized
707                                {
708                                    let mut finalized = finalized.lock().unwrap();
709                                    if let Some(previous) = finalized.insert(view, digest.clone()) {
710                                        if previous != digest {
711                                            panic!(
712                                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
713                                                view, previous, digest
714                                            );
715                                        }
716                                    }
717                                    if (finalized.len() as u64) < required_containers {
718                                        continue;
719                                    }
720                                }
721                                completed.lock().unwrap().insert(validator);
722                            }
723                        }
724                    }
725                });
726
727                // Exit at random points for unclean shutdown of entire set
728                let wait =
729                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
730                context.sleep(wait).await;
731                {
732                    let mut shutdowns = shutdowns.lock().unwrap();
733                    debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
734                    *shutdowns += 1;
735                }
736
737                // Collect supervisors
738                supervised.lock().unwrap().push(supervisors);
739            }});
740
741            // Recover context
742            (executor, context, _) = context.recover();
743        }
744
745        // Check supervisors for faults activity
746        let supervised = supervised.lock().unwrap();
747        for supervisors in supervised.iter() {
748            for (_, supervisor) in supervisors.iter() {
749                let faults = supervisor.faults.lock().unwrap();
750                assert!(faults.is_empty());
751            }
752        }
753    }
754
755    #[test_traced]
756    fn test_backfill() {
757        // Create context
758        let n = 4;
759        let threshold = quorum(n).expect("unable to calculate threshold");
760        let required_containers = 100;
761        let activity_timeout = 10;
762        let namespace = b"consensus".to_vec();
763        let (executor, mut context, _) = Executor::timed(Duration::from_secs(360));
764        executor.start(async move {
765            // Create simulated network
766            let (network, mut oracle) = Network::new(
767                context.with_label("network"),
768                Config {
769                    max_size: 1024 * 1024,
770                },
771            );
772
773            // Start network
774            network.start();
775
776            // Register participants
777            let mut schemes = Vec::new();
778            let mut validators = Vec::new();
779            for i in 0..n {
780                let scheme = Ed25519::from_seed(i as u64);
781                let pk = scheme.public_key();
782                schemes.push(scheme);
783                validators.push(pk);
784            }
785            validators.sort();
786            schemes.sort_by_key(|s| s.public_key());
787            let mut registrations = register_validators(&mut oracle, &validators).await;
788
789            // Link all validators except first
790            let link = Link {
791                latency: 10.0,
792                jitter: 1.0,
793                success_rate: 1.0,
794            };
795            link_validators(
796                &mut oracle,
797                &validators,
798                Action::Link(link),
799                Some(|_, i, j| ![i, j].contains(&0usize)),
800            )
801            .await;
802
803            // Derive threshold
804            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
805            let pk = poly::public(&public);
806            let prover = Prover::new(pk, &namespace);
807
808            // Create engines
809            let relay = Arc::new(mocks::relay::Relay::new());
810            let mut supervisors = Vec::new();
811            let (done_sender, mut done_receiver) = mpsc::unbounded();
812            let mut engine_handlers = Vec::new();
813            for (idx_scheme, scheme) in schemes.iter().enumerate() {
814                // Skip first peer
815                if idx_scheme == 0 {
816                    continue;
817                }
818
819                // Create scheme context
820                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
821
822                // Configure engine
823                let validator = scheme.public_key();
824                let mut participants = BTreeMap::new();
825                participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
826                let supervisor_config = mocks::supervisor::Config {
827                    prover: prover.clone(),
828                    participants,
829                };
830                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
831                supervisors.push(supervisor.clone());
832                let application_cfg = mocks::application::Config {
833                    hasher: Sha256::default(),
834                    relay: relay.clone(),
835                    participant: validator.clone(),
836                    tracker: done_sender.clone(),
837                    propose_latency: (10.0, 5.0),
838                    verify_latency: (10.0, 5.0),
839                };
840                let (actor, application) = mocks::application::Application::new(
841                    context.with_label("application"),
842                    application_cfg,
843                );
844                actor.start();
845                let cfg = JConfig {
846                    partition: validator.to_string(),
847                };
848                let journal = Journal::init(context.with_label("journal"), cfg)
849                    .await
850                    .expect("unable to create journal");
851                let cfg = config::Config {
852                    crypto: scheme.clone(),
853                    automaton: application.clone(),
854                    relay: application.clone(),
855                    committer: application,
856                    supervisor,
857                    mailbox_size: 1024,
858                    namespace: namespace.clone(),
859                    leader_timeout: Duration::from_secs(1),
860                    notarization_timeout: Duration::from_secs(2),
861                    nullify_retry: Duration::from_secs(10),
862                    fetch_timeout: Duration::from_secs(1),
863                    activity_timeout,
864                    max_fetch_count: 1, // force many fetches
865                    max_fetch_size: 1024 * 1024,
866                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
867                    fetch_concurrent: 1,
868                    replay_concurrency: 1,
869                };
870                let engine = Engine::new(context.with_label("engine"), journal, cfg);
871
872                // Start engine
873                let (voter, resolver) = registrations
874                    .remove(&validator)
875                    .expect("validator should be registered");
876                engine_handlers.push(engine.start(voter, resolver));
877            }
878
879            // Wait for all online engines to finish
880            let mut completed = HashSet::new();
881            let mut finalized = HashMap::new();
882            loop {
883                let (validator, event) = done_receiver.next().await.unwrap();
884                if let mocks::application::Progress::Finalized(proof, digest) = event {
885                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
886                    if digest != payload {
887                        panic!(
888                            "finalization mismatch digest: {:?}, payload: {:?}",
889                            digest, payload
890                        );
891                    }
892                    finalized.insert(view, digest);
893                    if (finalized.len() as u64) < required_containers {
894                        continue;
895                    }
896                    completed.insert(validator);
897                }
898                if completed.len() == (n - 1) as usize {
899                    break;
900                }
901            }
902
903            // Degrade network connections for online peers
904            let link = Link {
905                latency: 3_000.0,
906                jitter: 0.0,
907                success_rate: 1.0,
908            };
909            link_validators(
910                &mut oracle,
911                &validators,
912                Action::Update(link.clone()),
913                Some(|_, i, j| ![i, j].contains(&0usize)),
914            )
915            .await;
916
917            // Wait for nullifications to accrue
918            context.sleep(Duration::from_secs(120)).await;
919
920            // Unlink second peer from all (except first)
921            link_validators(
922                &mut oracle,
923                &validators,
924                Action::Unlink,
925                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
926            )
927            .await;
928
929            // Configure engine for first peer
930            let scheme = schemes[0].clone();
931            let validator = scheme.public_key();
932            {
933                // Create scheme context
934                let context = context.with_label(&format!("validator-{}", validator));
935
936                // Link first peer to all (except second)
937                link_validators(
938                    &mut oracle,
939                    &validators,
940                    Action::Link(link),
941                    Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
942                )
943                .await;
944
945                // Restore network connections for all online peers
946                let link = Link {
947                    latency: 10.0,
948                    jitter: 2.5,
949                    success_rate: 1.0,
950                };
951                link_validators(
952                    &mut oracle,
953                    &validators,
954                    Action::Update(link),
955                    Some(|_, i, j| ![i, j].contains(&1usize)),
956                )
957                .await;
958
959                // Configure engine
960                let mut participants = BTreeMap::new();
961                participants.insert(0, (public.clone(), validators.clone(), shares[0]));
962                let supervisor_config = mocks::supervisor::Config {
963                    prover: prover.clone(),
964                    participants,
965                };
966                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
967                supervisors.push(supervisor.clone());
968                let application_cfg = mocks::application::Config {
969                    hasher: Sha256::default(),
970                    relay: relay.clone(),
971                    participant: validator.clone(),
972                    tracker: done_sender.clone(),
973                    propose_latency: (10.0, 5.0),
974                    verify_latency: (10.0, 5.0),
975                };
976                let (actor, application) = mocks::application::Application::new(
977                    context.with_label("application"),
978                    application_cfg,
979                );
980                actor.start();
981                let cfg = JConfig {
982                    partition: validator.to_string(),
983                };
984                let journal = Journal::init(context.with_label("journal"), cfg)
985                    .await
986                    .expect("unable to create journal");
987                let cfg = config::Config {
988                    crypto: scheme,
989                    automaton: application.clone(),
990                    relay: application.clone(),
991                    committer: application,
992                    supervisor,
993                    mailbox_size: 1024,
994                    namespace: namespace.clone(),
995                    leader_timeout: Duration::from_secs(1),
996                    notarization_timeout: Duration::from_secs(2),
997                    nullify_retry: Duration::from_secs(10),
998                    fetch_timeout: Duration::from_secs(1),
999                    activity_timeout,
1000                    max_fetch_count: 1,
1001                    max_fetch_size: 1024 * 512,
1002                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1003                    fetch_concurrent: 1,
1004                    replay_concurrency: 1,
1005                };
1006                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1007
1008                // Start engine
1009                let (voter, resolver) = registrations
1010                    .remove(&validator)
1011                    .expect("validator should be registered");
1012                engine_handlers.push(engine.start(voter, resolver));
1013            }
1014
1015            // Wait for new engine to finalize required
1016            let mut finalized = HashMap::new();
1017            let mut validator_finalized = HashSet::new();
1018            loop {
1019                let (candidate, event) = done_receiver.next().await.unwrap();
1020                if let mocks::application::Progress::Finalized(proof, digest) = event {
1021                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1022                    if digest != payload {
1023                        panic!(
1024                            "finalization mismatch digest: {:?}, payload: {:?}",
1025                            digest, payload
1026                        );
1027                    }
1028                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1029                        if previous != digest {
1030                            panic!(
1031                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1032                                view, previous, digest
1033                            );
1034                        }
1035                    }
1036                    if validator == candidate {
1037                        validator_finalized.insert(view);
1038                    }
1039                }
1040                if validator_finalized.len() == required_containers as usize {
1041                    break;
1042                }
1043            }
1044        });
1045    }
1046
1047    #[test_traced]
1048    fn test_one_offline() {
1049        // Create context
1050        let n = 5;
1051        let threshold = quorum(n).expect("unable to calculate threshold");
1052        let required_containers = 100;
1053        let activity_timeout = 10;
1054        let namespace = b"consensus".to_vec();
1055        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
1056        executor.start(async move {
1057            // Create simulated network
1058            let (network, mut oracle) = Network::new(
1059                context.with_label("network"),
1060                Config {
1061                    max_size: 1024 * 1024,
1062                },
1063            );
1064
1065            // Start network
1066            network.start();
1067
1068            // Register participants
1069            let mut schemes = Vec::new();
1070            let mut validators = Vec::new();
1071            for i in 0..n {
1072                let scheme = Ed25519::from_seed(i as u64);
1073                let pk = scheme.public_key();
1074                schemes.push(scheme);
1075                validators.push(pk);
1076            }
1077            validators.sort();
1078            schemes.sort_by_key(|s| s.public_key());
1079            let mut registrations = register_validators(&mut oracle, &validators).await;
1080
1081            // Link all validators except first
1082            let link = Link {
1083                latency: 10.0,
1084                jitter: 1.0,
1085                success_rate: 1.0,
1086            };
1087            link_validators(
1088                &mut oracle,
1089                &validators,
1090                Action::Link(link),
1091                Some(|_, i, j| ![i, j].contains(&0usize)),
1092            )
1093            .await;
1094
1095            // Derive threshold
1096            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1097            let pk = poly::public(&public);
1098            let prover = Prover::new(pk, &namespace);
1099
1100            // Create engines
1101            let relay = Arc::new(mocks::relay::Relay::new());
1102            let mut supervisors = Vec::new();
1103            let (done_sender, mut done_receiver) = mpsc::unbounded();
1104            let mut engine_handlers = Vec::new();
1105            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1106                // Skip first peer
1107                if idx_scheme == 0 {
1108                    continue;
1109                }
1110
1111                // Create scheme context
1112                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1113
1114                // Configure engine
1115                let validator = scheme.public_key();
1116                let mut participants = BTreeMap::new();
1117                participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
1118                let supervisor_config = mocks::supervisor::Config {
1119                    prover: prover.clone(),
1120                    participants,
1121                };
1122                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1123                supervisors.push(supervisor.clone());
1124                let application_cfg = mocks::application::Config {
1125                    hasher: Sha256::default(),
1126                    relay: relay.clone(),
1127                    participant: validator.clone(),
1128                    tracker: done_sender.clone(),
1129                    propose_latency: (10.0, 5.0),
1130                    verify_latency: (10.0, 5.0),
1131                };
1132                let (actor, application) = mocks::application::Application::new(
1133                    context.with_label("application"),
1134                    application_cfg,
1135                );
1136                actor.start();
1137                let cfg = JConfig {
1138                    partition: validator.to_string(),
1139                };
1140                let journal = Journal::init(context.with_label("journal"), cfg)
1141                    .await
1142                    .expect("unable to create journal");
1143                let cfg = config::Config {
1144                    crypto: scheme,
1145                    automaton: application.clone(),
1146                    relay: application.clone(),
1147                    committer: application,
1148                    supervisor,
1149                    mailbox_size: 1024,
1150                    namespace: namespace.clone(),
1151                    leader_timeout: Duration::from_secs(1),
1152                    notarization_timeout: Duration::from_secs(2),
1153                    nullify_retry: Duration::from_secs(10),
1154                    fetch_timeout: Duration::from_secs(1),
1155                    activity_timeout,
1156                    max_fetch_count: 1,
1157                    max_fetch_size: 1024 * 512,
1158                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1159                    fetch_concurrent: 1,
1160                    replay_concurrency: 1,
1161                };
1162                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1163
1164                // Start engine
1165                let (voter, resolver) = registrations
1166                    .remove(&validator)
1167                    .expect("validator should be registered");
1168                engine_handlers.push(engine.start(voter, resolver));
1169            }
1170
1171            // Wait for all engines to finish
1172            let mut completed = HashSet::new();
1173            let mut finalized = HashMap::new();
1174            loop {
1175                let (validator, event) = done_receiver.next().await.unwrap();
1176                if let mocks::application::Progress::Finalized(proof, digest) = event {
1177                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1178                    if digest != payload {
1179                        panic!(
1180                            "finalization mismatch digest: {:?}, payload: {:?}",
1181                            digest, payload
1182                        );
1183                    }
1184                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1185                        if previous != digest {
1186                            panic!(
1187                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1188                                view, previous, digest
1189                            );
1190                        }
1191                    }
1192                    if (finalized.len() as u64) < required_containers {
1193                        continue;
1194                    }
1195                    completed.insert(validator);
1196                }
1197                if completed.len() == (n - 1) as usize {
1198                    break;
1199                }
1200            }
1201
1202            // Check supervisors for correct activity
1203            let offline = &validators[0];
1204            for supervisor in supervisors.iter() {
1205                // Ensure no faults
1206                {
1207                    let faults = supervisor.faults.lock().unwrap();
1208                    assert!(faults.is_empty());
1209                }
1210
1211                // Ensure offline node is never active
1212                {
1213                    let notarizes = supervisor.notarizes.lock().unwrap();
1214                    for (view, payloads) in notarizes.iter() {
1215                        for (_, participants) in payloads.iter() {
1216                            if participants.contains(offline) {
1217                                panic!("view: {}", view);
1218                            }
1219                        }
1220                    }
1221                }
1222                {
1223                    let finalizes = supervisor.finalizes.lock().unwrap();
1224                    for (view, payloads) in finalizes.iter() {
1225                        for (_, finalizers) in payloads.iter() {
1226                            if finalizers.contains(offline) {
1227                                panic!("view: {}", view);
1228                            }
1229                        }
1230                    }
1231                }
1232            }
1233        });
1234    }
1235
1236    #[test_traced]
1237    fn test_slow_validator() {
1238        // Create context
1239        let n = 5;
1240        let threshold = quorum(n).expect("unable to calculate threshold");
1241        let required_containers = 50;
1242        let activity_timeout = 10;
1243        let namespace = b"consensus".to_vec();
1244        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
1245        executor.start(async move {
1246            // Create simulated network
1247            let (network, mut oracle) = Network::new(
1248                context.with_label("network"),
1249                Config {
1250                    max_size: 1024 * 1024,
1251                },
1252            );
1253
1254            // Start network
1255            network.start();
1256
1257            // Register participants
1258            let mut schemes = Vec::new();
1259            let mut validators = Vec::new();
1260            for i in 0..n {
1261                let scheme = Ed25519::from_seed(i as u64);
1262                let pk = scheme.public_key();
1263                schemes.push(scheme);
1264                validators.push(pk);
1265            }
1266            validators.sort();
1267            schemes.sort_by_key(|s| s.public_key());
1268            let mut registrations = register_validators(&mut oracle, &validators).await;
1269
1270            // Link all validators
1271            let link = Link {
1272                latency: 10.0,
1273                jitter: 1.0,
1274                success_rate: 1.0,
1275            };
1276            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1277
1278            // Derive threshold
1279            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1280            let pk = poly::public(&public);
1281            let prover = Prover::new(pk, &namespace);
1282
1283            // Create engines
1284            let relay = Arc::new(mocks::relay::Relay::new());
1285            let mut supervisors = Vec::new();
1286            let (done_sender, mut done_receiver) = mpsc::unbounded();
1287            let mut engine_handlers = Vec::new();
1288            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1289                // Create scheme context
1290                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1291
1292                // Configure engine
1293                let validator = scheme.public_key();
1294                let mut participants = BTreeMap::new();
1295                participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
1296                let supervisor_config = mocks::supervisor::Config {
1297                    prover: prover.clone(),
1298                    participants,
1299                };
1300                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1301                supervisors.push(supervisor.clone());
1302                let application_cfg = if idx_scheme == 0 {
1303                    mocks::application::Config {
1304                        hasher: Sha256::default(),
1305                        relay: relay.clone(),
1306                        participant: validator.clone(),
1307                        tracker: done_sender.clone(),
1308                        propose_latency: (3_000.0, 0.0),
1309                        verify_latency: (3_000.0, 5.0),
1310                    }
1311                } else {
1312                    mocks::application::Config {
1313                        hasher: Sha256::default(),
1314                        relay: relay.clone(),
1315                        participant: validator.clone(),
1316                        tracker: done_sender.clone(),
1317                        propose_latency: (10.0, 5.0),
1318                        verify_latency: (10.0, 5.0),
1319                    }
1320                };
1321                let (actor, application) = mocks::application::Application::new(
1322                    context.with_label("application"),
1323                    application_cfg,
1324                );
1325                actor.start();
1326                let cfg = JConfig {
1327                    partition: validator.to_string(),
1328                };
1329                let journal = Journal::init(context.with_label("journal"), cfg)
1330                    .await
1331                    .expect("unable to create journal");
1332                let cfg = config::Config {
1333                    crypto: scheme,
1334                    automaton: application.clone(),
1335                    relay: application.clone(),
1336                    committer: application,
1337                    supervisor,
1338                    mailbox_size: 1024,
1339                    namespace: namespace.clone(),
1340                    leader_timeout: Duration::from_secs(1),
1341                    notarization_timeout: Duration::from_secs(2),
1342                    nullify_retry: Duration::from_secs(10),
1343                    fetch_timeout: Duration::from_secs(1),
1344                    activity_timeout,
1345                    max_fetch_count: 1,
1346                    max_fetch_size: 1024 * 512,
1347                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1348                    fetch_concurrent: 1,
1349                    replay_concurrency: 1,
1350                };
1351                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1352
1353                // Start engine
1354                let (voter, resolver) = registrations
1355                    .remove(&validator)
1356                    .expect("validator should be registered");
1357                engine_handlers.push(engine.start(voter, resolver));
1358            }
1359
1360            // Wait for all engines to finish
1361            let mut completed = HashSet::new();
1362            let mut finalized = HashMap::new();
1363            loop {
1364                let (validator, event) = done_receiver.next().await.unwrap();
1365                if let mocks::application::Progress::Finalized(proof, digest) = event {
1366                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1367                    if digest != payload {
1368                        panic!(
1369                            "finalization mismatch digest: {:?}, payload: {:?}",
1370                            digest, payload
1371                        );
1372                    }
1373                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1374                        if previous != digest {
1375                            panic!(
1376                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1377                                view, previous, digest
1378                            );
1379                        }
1380                    }
1381                    if (finalized.len() as u64) < required_containers {
1382                        continue;
1383                    }
1384                    completed.insert(validator);
1385                }
1386                if completed.len() == n as usize {
1387                    break;
1388                }
1389            }
1390
1391            // Check supervisors for correct activity
1392            let slow = &validators[0];
1393            for supervisor in supervisors.iter() {
1394                // Ensure no faults
1395                {
1396                    let faults = supervisor.faults.lock().unwrap();
1397                    assert!(faults.is_empty());
1398                }
1399
1400                // Ensure slow node is never active
1401                {
1402                    let notarizes = supervisor.notarizes.lock().unwrap();
1403                    for (view, payloads) in notarizes.iter() {
1404                        for (_, participants) in payloads.iter() {
1405                            if participants.contains(slow) {
1406                                panic!("view: {}", view);
1407                            }
1408                        }
1409                    }
1410                }
1411                {
1412                    let finalizes = supervisor.finalizes.lock().unwrap();
1413                    for (view, payloads) in finalizes.iter() {
1414                        for (_, finalizers) in payloads.iter() {
1415                            if finalizers.contains(slow) {
1416                                panic!("view: {}", view);
1417                            }
1418                        }
1419                    }
1420                }
1421            }
1422        });
1423    }
1424
1425    #[test_traced]
1426    fn test_all_recovery() {
1427        // Create context
1428        let n = 5;
1429        let threshold = quorum(n).expect("unable to calculate threshold");
1430        let required_containers = 100;
1431        let activity_timeout = 10;
1432        let namespace = b"consensus".to_vec();
1433        let (executor, mut context, _) = Executor::timed(Duration::from_secs(120));
1434        executor.start(async move {
1435            // Create simulated network
1436            let (network, mut oracle) = Network::new(
1437                context.with_label("network"),
1438                Config {
1439                    max_size: 1024 * 1024,
1440                },
1441            );
1442
1443            // Start network
1444            network.start();
1445
1446            // Register participants
1447            let mut schemes = Vec::new();
1448            let mut validators = Vec::new();
1449            for i in 0..n {
1450                let scheme = Ed25519::from_seed(i as u64);
1451                let pk = scheme.public_key();
1452                schemes.push(scheme);
1453                validators.push(pk);
1454            }
1455            validators.sort();
1456            schemes.sort_by_key(|s| s.public_key());
1457            let mut registrations = register_validators(&mut oracle, &validators).await;
1458
1459            // Link all validators
1460            let link = Link {
1461                latency: 3_000.0,
1462                jitter: 0.0,
1463                success_rate: 1.0,
1464            };
1465            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1466
1467            // Derive threshold
1468            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1469            let pk = poly::public(&public);
1470            let prover = Prover::new(pk, &namespace);
1471
1472            // Create engines
1473            let relay = Arc::new(mocks::relay::Relay::new());
1474            let mut supervisors = Vec::new();
1475            let (done_sender, mut done_receiver) = mpsc::unbounded();
1476            let mut engine_handlers = Vec::new();
1477            for (idx, scheme) in schemes.iter().enumerate() {
1478                // Create scheme context
1479                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1480
1481                // Configure engine
1482                let validator = scheme.public_key();
1483                let mut participants = BTreeMap::new();
1484                participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1485                let supervisor_config = mocks::supervisor::Config {
1486                    prover: prover.clone(),
1487                    participants,
1488                };
1489                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1490                supervisors.push(supervisor.clone());
1491                let application_cfg = mocks::application::Config {
1492                    hasher: Sha256::default(),
1493                    relay: relay.clone(),
1494                    participant: validator.clone(),
1495                    tracker: done_sender.clone(),
1496                    propose_latency: (10.0, 5.0),
1497                    verify_latency: (10.0, 5.0),
1498                };
1499                let (actor, application) = mocks::application::Application::new(
1500                    context.with_label("application"),
1501                    application_cfg,
1502                );
1503                actor.start();
1504                let cfg = JConfig {
1505                    partition: validator.to_string(),
1506                };
1507                let journal = Journal::init(context.with_label("journal"), cfg)
1508                    .await
1509                    .expect("unable to create journal");
1510                let cfg = config::Config {
1511                    crypto: scheme.clone(),
1512                    automaton: application.clone(),
1513                    relay: application.clone(),
1514                    committer: application,
1515                    supervisor,
1516                    mailbox_size: 1024,
1517                    namespace: namespace.clone(),
1518                    leader_timeout: Duration::from_secs(1),
1519                    notarization_timeout: Duration::from_secs(2),
1520                    nullify_retry: Duration::from_secs(10),
1521                    fetch_timeout: Duration::from_secs(1),
1522                    activity_timeout,
1523                    max_fetch_count: 1,
1524                    max_fetch_size: 1024 * 512,
1525                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1526                    fetch_concurrent: 1,
1527                    replay_concurrency: 1,
1528                };
1529                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1530
1531                // Start engine
1532                let (voter, resolver) = registrations
1533                    .remove(&validator)
1534                    .expect("validator should be registered");
1535                engine_handlers.push(engine.start(voter, resolver));
1536            }
1537
1538            // Wait for a few virtual minutes (shouldn't finalize anything)
1539            select! {
1540                _timeout = context.sleep(Duration::from_secs(60)) => {},
1541                _done = done_receiver.next() => {
1542                    panic!("engine should not notarize or finalize anything");
1543                }
1544            }
1545
1546            // Update links
1547            let link = Link {
1548                latency: 10.0,
1549                jitter: 1.0,
1550                success_rate: 1.0,
1551            };
1552            link_validators(&mut oracle, &validators, Action::Update(link), None).await;
1553
1554            // Wait for all engines to finish
1555            let mut completed = HashSet::new();
1556            let mut finalized = HashMap::new();
1557            loop {
1558                let (validator, event) = done_receiver.next().await.unwrap();
1559                if let mocks::application::Progress::Finalized(proof, digest) = event {
1560                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1561                    if digest != payload {
1562                        panic!(
1563                            "finalization mismatch digest: {:?}, payload: {:?}",
1564                            digest, payload
1565                        );
1566                    }
1567                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1568                        if previous != digest {
1569                            panic!(
1570                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1571                                view, previous, digest
1572                            );
1573                        }
1574                    }
1575                    if (finalized.len() as u64) < required_containers {
1576                        continue;
1577                    }
1578                    completed.insert(validator);
1579                }
1580                if completed.len() == n as usize {
1581                    break;
1582                }
1583            }
1584
1585            // Check supervisors for correct activity
1586            for supervisor in supervisors.iter() {
1587                // Ensure no faults
1588                {
1589                    let faults = supervisor.faults.lock().unwrap();
1590                    assert!(faults.is_empty());
1591                }
1592            }
1593        });
1594    }
1595
1596    #[test_traced]
1597    fn test_partition() {
1598        // Create context
1599        let n = 10;
1600        let threshold = quorum(n).expect("unable to calculate threshold");
1601        let required_containers = 50;
1602        let activity_timeout = 10;
1603        let namespace = b"consensus".to_vec();
1604        let (executor, mut context, _) = Executor::timed(Duration::from_secs(900));
1605        executor.start(async move {
1606            // Create simulated network
1607            let (network, mut oracle) = Network::new(
1608                context.with_label("network"),
1609                Config {
1610                    max_size: 1024 * 1024,
1611                },
1612            );
1613
1614            // Start network
1615            network.start();
1616
1617            // Register participants
1618            let mut schemes = Vec::new();
1619            let mut validators = Vec::new();
1620            for i in 0..n {
1621                let scheme = Ed25519::from_seed(i as u64);
1622                let pk = scheme.public_key();
1623                schemes.push(scheme);
1624                validators.push(pk);
1625            }
1626            validators.sort();
1627            schemes.sort_by_key(|s| s.public_key());
1628            let mut registrations = register_validators(&mut oracle, &validators).await;
1629
1630            // Link all validators
1631            let link = Link {
1632                latency: 10.0,
1633                jitter: 1.0,
1634                success_rate: 1.0,
1635            };
1636            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1637
1638            // Derive threshold
1639            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1640            let pk = poly::public(&public);
1641            let prover = Prover::new(pk, &namespace);
1642
1643            // Create engines
1644            let relay = Arc::new(mocks::relay::Relay::new());
1645            let mut supervisors = Vec::new();
1646            let (done_sender, mut done_receiver) = mpsc::unbounded();
1647            let mut engine_handlers = Vec::new();
1648            for (idx, scheme) in schemes.iter().enumerate() {
1649                // Create scheme context
1650                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1651
1652                // Configure engine
1653                let validator = scheme.public_key();
1654                let mut participants = BTreeMap::new();
1655                participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1656                let supervisor_config = mocks::supervisor::Config {
1657                    prover: prover.clone(),
1658                    participants,
1659                };
1660                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1661                supervisors.push(supervisor.clone());
1662                let application_cfg = mocks::application::Config {
1663                    hasher: Sha256::default(),
1664                    relay: relay.clone(),
1665                    participant: validator.clone(),
1666                    tracker: done_sender.clone(),
1667                    propose_latency: (10.0, 5.0),
1668                    verify_latency: (10.0, 5.0),
1669                };
1670                let (actor, application) = mocks::application::Application::new(
1671                    context.with_label("application"),
1672                    application_cfg,
1673                );
1674                actor.start();
1675                let cfg = JConfig {
1676                    partition: validator.to_string(),
1677                };
1678                let journal = Journal::init(context.with_label("journal"), cfg)
1679                    .await
1680                    .expect("unable to create journal");
1681                let cfg = config::Config {
1682                    crypto: scheme.clone(),
1683                    automaton: application.clone(),
1684                    relay: application.clone(),
1685                    committer: application,
1686                    supervisor,
1687                    mailbox_size: 1024,
1688                    namespace: namespace.clone(),
1689                    leader_timeout: Duration::from_secs(1),
1690                    notarization_timeout: Duration::from_secs(2),
1691                    nullify_retry: Duration::from_secs(10),
1692                    fetch_timeout: Duration::from_secs(1),
1693                    activity_timeout,
1694                    max_fetch_count: 1,
1695                    max_fetch_size: 1024 * 512,
1696                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1697                    fetch_concurrent: 1,
1698                    replay_concurrency: 1,
1699                };
1700                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1701
1702                // Start engine
1703                let (voter, resolver) = registrations
1704                    .remove(&validator)
1705                    .expect("validator should be registered");
1706                engine_handlers.push(engine.start(voter, resolver));
1707            }
1708
1709            // Wait for all engines to finish
1710            let mut completed = HashSet::new();
1711            let mut finalized = HashMap::new();
1712            let mut highest_finalized = 0;
1713            loop {
1714                let (validator, event) = done_receiver.next().await.unwrap();
1715                if let mocks::application::Progress::Finalized(proof, digest) = event {
1716                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1717                    if digest != payload {
1718                        panic!(
1719                            "finalization mismatch digest: {:?}, payload: {:?}",
1720                            digest, payload
1721                        );
1722                    }
1723                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1724                        if previous != digest {
1725                            panic!(
1726                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1727                                view, previous, digest
1728                            );
1729                        }
1730                    }
1731                    if view > highest_finalized {
1732                        highest_finalized = view;
1733                    }
1734                    if (finalized.len() as u64) < required_containers {
1735                        continue;
1736                    }
1737                    completed.insert(validator);
1738                }
1739                if completed.len() == n as usize {
1740                    break;
1741                }
1742            }
1743
1744            // Cut all links between validator halves
1745            fn separated(n: usize, a: usize, b: usize) -> bool {
1746                let m = n / 2;
1747                (a < m && b >= m) || (a >= m && b < m)
1748            }
1749            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1750
1751            // Wait for any in-progress notarizations/finalizations to finish
1752            context.sleep(Duration::from_secs(10)).await;
1753
1754            // Empty done receiver
1755            loop {
1756                if done_receiver.try_next().is_err() {
1757                    break;
1758                }
1759            }
1760
1761            // Wait for a few virtual minutes (shouldn't finalize anything)
1762            select! {
1763                _timeout = context.sleep(Duration::from_secs(600)) => {},
1764                _done = done_receiver.next() => {
1765                    panic!("engine should not notarize or finalize anything");
1766                }
1767            }
1768
1769            // Restore links
1770            link_validators(
1771                &mut oracle,
1772                &validators,
1773                Action::Link(link),
1774                Some(separated),
1775            )
1776            .await;
1777
1778            // Wait for all engines to finish
1779            let mut completed = HashSet::new();
1780            loop {
1781                let (validator, event) = done_receiver.next().await.unwrap();
1782                if let mocks::application::Progress::Finalized(proof, digest) = event {
1783                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1784                    if digest != payload {
1785                        panic!(
1786                            "finalization mismatch digest: {:?}, payload: {:?}",
1787                            digest, payload
1788                        );
1789                    }
1790                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1791                        if previous != digest {
1792                            panic!(
1793                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1794                                view, previous, digest
1795                            );
1796                        }
1797                    }
1798                    if (finalized.len() as u64) < required_containers + highest_finalized {
1799                        continue;
1800                    }
1801                    completed.insert(validator);
1802                }
1803                if completed.len() == n as usize {
1804                    break;
1805                }
1806            }
1807
1808            // Check supervisors for correct activity
1809            for supervisor in supervisors.iter() {
1810                // Ensure no faults
1811                {
1812                    let faults = supervisor.faults.lock().unwrap();
1813                    assert!(faults.is_empty());
1814                }
1815            }
1816        });
1817    }
1818
1819    fn slow_and_lossy_links(seed: u64) -> String {
1820        // Create context
1821        let n = 5;
1822        let threshold = quorum(n).expect("unable to calculate threshold");
1823        let required_containers = 50;
1824        let activity_timeout = 10;
1825        let namespace = b"consensus".to_vec();
1826        let cfg = deterministic::Config {
1827            seed,
1828            timeout: Some(Duration::from_secs(3_000)),
1829            ..deterministic::Config::default()
1830        };
1831        let (executor, mut context, auditor) = Executor::init(cfg);
1832        executor.start(async move {
1833            // Create simulated network
1834            let (network, mut oracle) = Network::new(
1835                context.with_label("network"),
1836                Config {
1837                    max_size: 1024 * 1024,
1838                },
1839            );
1840
1841            // Start network
1842            network.start();
1843
1844            // Register participants
1845            let mut schemes = Vec::new();
1846            let mut validators = Vec::new();
1847            for i in 0..n {
1848                let scheme = Ed25519::from_seed(i as u64);
1849                let pk = scheme.public_key();
1850                schemes.push(scheme);
1851                validators.push(pk);
1852            }
1853            validators.sort();
1854            schemes.sort_by_key(|s| s.public_key());
1855            let mut registrations = register_validators(&mut oracle, &validators).await;
1856
1857            // Link all validators
1858            let degraded_link = Link {
1859                latency: 200.0,
1860                jitter: 150.0,
1861                success_rate: 0.5,
1862            };
1863            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1864
1865            // Derive threshold
1866            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1867            let pk = poly::public(&public);
1868            let prover = Prover::new(pk, &namespace);
1869
1870            // Create engines
1871            let relay = Arc::new(mocks::relay::Relay::new());
1872            let mut supervisors = Vec::new();
1873            let (done_sender, mut done_receiver) = mpsc::unbounded();
1874            let mut engine_handlers = Vec::new();
1875            for (idx, scheme) in schemes.into_iter().enumerate() {
1876                // Create scheme context
1877                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1878
1879                // Configure engine
1880                let validator = scheme.public_key();
1881                let mut participants = BTreeMap::new();
1882                participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1883                let supervisor_config = mocks::supervisor::Config {
1884                    prover: prover.clone(),
1885                    participants,
1886                };
1887                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1888                supervisors.push(supervisor.clone());
1889                let application_cfg = mocks::application::Config {
1890                    hasher: Sha256::default(),
1891                    relay: relay.clone(),
1892                    participant: validator.clone(),
1893                    tracker: done_sender.clone(),
1894                    propose_latency: (10.0, 5.0),
1895                    verify_latency: (10.0, 5.0),
1896                };
1897                let (actor, application) = mocks::application::Application::new(
1898                    context.with_label("application"),
1899                    application_cfg,
1900                );
1901                actor.start();
1902                let cfg = JConfig {
1903                    partition: validator.to_string(),
1904                };
1905                let journal = Journal::init(context.with_label("journal"), cfg)
1906                    .await
1907                    .expect("unable to create journal");
1908                let cfg = config::Config {
1909                    crypto: scheme,
1910                    automaton: application.clone(),
1911                    relay: application.clone(),
1912                    committer: application,
1913                    supervisor,
1914                    mailbox_size: 1024,
1915                    namespace: namespace.clone(),
1916                    leader_timeout: Duration::from_secs(1),
1917                    notarization_timeout: Duration::from_secs(2),
1918                    nullify_retry: Duration::from_secs(10),
1919                    fetch_timeout: Duration::from_secs(1),
1920                    activity_timeout,
1921                    max_fetch_count: 1,
1922                    max_fetch_size: 1024 * 512,
1923                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1924                    fetch_concurrent: 1,
1925                    replay_concurrency: 1,
1926                };
1927                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1928
1929                // Start engine
1930                let (voter, resolver) = registrations
1931                    .remove(&validator)
1932                    .expect("validator should be registered");
1933                engine_handlers.push(engine.start(voter, resolver));
1934            }
1935
1936            // Wait for all engines to finish
1937            let mut completed = HashSet::new();
1938            let mut finalized = HashMap::new();
1939            loop {
1940                let (validator, event) = done_receiver.next().await.unwrap();
1941                if let mocks::application::Progress::Finalized(proof, digest) = event {
1942                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1943                    if digest != payload {
1944                        panic!(
1945                            "finalization mismatch digest: {:?}, payload: {:?}",
1946                            digest, payload
1947                        );
1948                    }
1949                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1950                        if previous != digest {
1951                            panic!(
1952                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1953                                view, previous, digest
1954                            );
1955                        }
1956                    }
1957                    if (finalized.len() as u64) < required_containers {
1958                        continue;
1959                    }
1960                    completed.insert(validator);
1961                }
1962                if completed.len() == n as usize {
1963                    break;
1964                }
1965            }
1966
1967            // Check supervisors for correct activity
1968            for supervisor in supervisors.iter() {
1969                // Ensure no faults
1970                {
1971                    let faults = supervisor.faults.lock().unwrap();
1972                    assert!(faults.is_empty());
1973                }
1974            }
1975        });
1976        auditor.state()
1977    }
1978
1979    #[test_traced]
1980    fn test_slow_and_lossy_links() {
1981        slow_and_lossy_links(0);
1982    }
1983
1984    #[test_traced]
1985    fn test_determinism() {
1986        // We use slow and lossy links as the deterministic test
1987        // because it is the most complex test.
1988        for seed in 1..6 {
1989            // Run test with seed
1990            let state_1 = slow_and_lossy_links(seed);
1991
1992            // Run test again with same seed
1993            let state_2 = slow_and_lossy_links(seed);
1994
1995            // Ensure states are equal
1996            assert_eq!(state_1, state_2);
1997        }
1998    }
1999
2000    #[test_traced]
2001    fn test_conflicter() {
2002        // Create context
2003        let n = 4;
2004        let threshold = quorum(n).expect("unable to calculate threshold");
2005        let required_containers = 50;
2006        let activity_timeout = 10;
2007        let namespace = b"consensus".to_vec();
2008        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
2009        executor.start(async move {
2010            // Create simulated network
2011            let (network, mut oracle) = Network::new(
2012                context.with_label("network"),
2013                Config {
2014                    max_size: 1024 * 1024,
2015                },
2016            );
2017
2018            // Start network
2019            network.start();
2020
2021            // Register participants
2022            let mut schemes = Vec::new();
2023            let mut validators = Vec::new();
2024            for i in 0..n {
2025                let scheme = Ed25519::from_seed(i as u64);
2026                let pk = scheme.public_key();
2027                schemes.push(scheme);
2028                validators.push(pk);
2029            }
2030            validators.sort();
2031            schemes.sort_by_key(|s| s.public_key());
2032            let mut registrations = register_validators(&mut oracle, &validators).await;
2033
2034            // Link all validators
2035            let link = Link {
2036                latency: 10.0,
2037                jitter: 1.0,
2038                success_rate: 1.0,
2039            };
2040            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2041
2042            // Derive threshold
2043            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
2044            let pk = poly::public(&public);
2045            let prover = Prover::new(pk, &namespace);
2046
2047            // Create engines
2048            let relay = Arc::new(mocks::relay::Relay::new());
2049            let mut supervisors = Vec::new();
2050            let (done_sender, mut done_receiver) = mpsc::unbounded();
2051            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2052                // Create scheme context
2053                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2054
2055                // Start engine
2056                let validator = scheme.public_key();
2057                let mut participants = BTreeMap::new();
2058                participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
2059                let supervisor_config = mocks::supervisor::Config {
2060                    prover: prover.clone(),
2061                    participants,
2062                };
2063                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2064                let (voter, resolver) = registrations
2065                    .remove(&validator)
2066                    .expect("validator should be registered");
2067                if idx_scheme == 0 {
2068                    let cfg = mocks::conflicter::Config {
2069                        supervisor,
2070                        namespace: namespace.clone(),
2071                    };
2072                    let engine: mocks::conflicter::Conflicter<_, Sha256, _> =
2073                        mocks::conflicter::Conflicter::new(
2074                            context.with_label("byzantine_engine"),
2075                            cfg,
2076                        );
2077                    engine.start(voter);
2078                } else {
2079                    supervisors.push(supervisor.clone());
2080                    let application_cfg = mocks::application::Config {
2081                        hasher: Sha256::default(),
2082                        relay: relay.clone(),
2083                        participant: validator.clone(),
2084                        tracker: done_sender.clone(),
2085                        propose_latency: (10.0, 5.0),
2086                        verify_latency: (10.0, 5.0),
2087                    };
2088                    let (actor, application) = mocks::application::Application::new(
2089                        context.with_label("application"),
2090                        application_cfg,
2091                    );
2092                    actor.start();
2093                    let cfg = JConfig {
2094                        partition: validator.to_string(),
2095                    };
2096                    let journal = Journal::init(context.with_label("journal"), cfg)
2097                        .await
2098                        .expect("unable to create journal");
2099                    let cfg = config::Config {
2100                        crypto: scheme,
2101                        automaton: application.clone(),
2102                        relay: application.clone(),
2103                        committer: application,
2104                        supervisor,
2105                        mailbox_size: 1024,
2106                        namespace: namespace.clone(),
2107                        leader_timeout: Duration::from_secs(1),
2108                        notarization_timeout: Duration::from_secs(2),
2109                        nullify_retry: Duration::from_secs(10),
2110                        fetch_timeout: Duration::from_secs(1),
2111                        activity_timeout,
2112                        max_fetch_count: 1,
2113                        max_fetch_size: 1024 * 512,
2114                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2115                        fetch_concurrent: 1,
2116                        replay_concurrency: 1,
2117                    };
2118                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
2119                    engine.start(voter, resolver);
2120                }
2121            }
2122
2123            // Wait for all engines to finish
2124            let mut completed = HashSet::new();
2125            let mut finalized = HashMap::new();
2126            loop {
2127                let (validator, event) = done_receiver.next().await.unwrap();
2128                if let mocks::application::Progress::Finalized(proof, digest) = event {
2129                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
2130                    if digest != payload {
2131                        panic!(
2132                            "finalization mismatch digest: {:?}, payload: {:?}",
2133                            digest, payload
2134                        );
2135                    }
2136                    if let Some(previous) = finalized.insert(view, digest.clone()) {
2137                        if previous != digest {
2138                            panic!(
2139                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2140                                view, previous, digest
2141                            );
2142                        }
2143                    }
2144                    if (finalized.len() as u64) < required_containers {
2145                        continue;
2146                    }
2147                    completed.insert(validator);
2148                }
2149                if completed.len() == (n - 1) as usize {
2150                    break;
2151                }
2152            }
2153
2154            // Check supervisors for correct activity
2155            let byz = &validators[0];
2156            let mut count_conflicting_notarize = 0;
2157            let mut count_conflicting_finalize = 0;
2158            for supervisor in supervisors.iter() {
2159                // Ensure only faults for byz
2160                {
2161                    let faults = supervisor.faults.lock().unwrap();
2162                    assert_eq!(faults.len(), 1);
2163                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2164                    for (_, faults) in faulter.iter() {
2165                        for fault in faults.iter() {
2166                            match *fault {
2167                                CONFLICTING_NOTARIZE => {
2168                                    count_conflicting_notarize += 1;
2169                                }
2170                                CONFLICTING_FINALIZE => {
2171                                    count_conflicting_finalize += 1;
2172                                }
2173                                _ => panic!("unexpected fault: {:?}", fault),
2174                            }
2175                        }
2176                    }
2177                }
2178            }
2179            assert!(count_conflicting_notarize > 0);
2180            assert!(count_conflicting_finalize > 0);
2181        });
2182    }
2183
2184    #[test_traced]
2185    fn test_nuller() {
2186        // Create context
2187        let n = 4;
2188        let threshold = quorum(n).expect("unable to calculate threshold");
2189        let required_containers = 50;
2190        let activity_timeout = 10;
2191        let namespace = b"consensus".to_vec();
2192        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
2193        executor.start(async move {
2194            // Create simulated network
2195            let (network, mut oracle) = Network::new(
2196                context.with_label("network"),
2197                Config {
2198                    max_size: 1024 * 1024,
2199                },
2200            );
2201
2202            // Start network
2203            network.start();
2204
2205            // Register participants
2206            let mut schemes = Vec::new();
2207            let mut validators = Vec::new();
2208            for i in 0..n {
2209                let scheme = Ed25519::from_seed(i as u64);
2210                let pk = scheme.public_key();
2211                schemes.push(scheme);
2212                validators.push(pk);
2213            }
2214            validators.sort();
2215            schemes.sort_by_key(|s| s.public_key());
2216            let mut registrations = register_validators(&mut oracle, &validators).await;
2217
2218            // Link all validators
2219            let link = Link {
2220                latency: 10.0,
2221                jitter: 1.0,
2222                success_rate: 1.0,
2223            };
2224            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2225
2226            // Derive threshold
2227            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
2228            let pk = poly::public(&public);
2229            let prover = Prover::new(pk, &namespace);
2230
2231            // Create engines
2232            let relay = Arc::new(mocks::relay::Relay::new());
2233            let mut supervisors = Vec::new();
2234            let (done_sender, mut done_receiver) = mpsc::unbounded();
2235            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2236                // Create scheme context
2237                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2238
2239                // Start engine
2240                let validator = scheme.public_key();
2241                let mut participants = BTreeMap::new();
2242                participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
2243                let supervisor_config = mocks::supervisor::Config {
2244                    prover: prover.clone(),
2245                    participants,
2246                };
2247                let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2248                let (voter, resolver) = registrations
2249                    .remove(&validator)
2250                    .expect("validator should be registered");
2251                if idx_scheme == 0 {
2252                    let cfg = mocks::nuller::Config {
2253                        supervisor,
2254                        namespace: namespace.clone(),
2255                    };
2256                    let engine: mocks::nuller::Nuller<_, Sha256, _> =
2257                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2258                    engine.start(voter);
2259                } else {
2260                    supervisors.push(supervisor.clone());
2261                    let application_cfg = mocks::application::Config {
2262                        hasher: Sha256::default(),
2263                        relay: relay.clone(),
2264                        participant: validator.clone(),
2265                        tracker: done_sender.clone(),
2266                        propose_latency: (10.0, 5.0),
2267                        verify_latency: (10.0, 5.0),
2268                    };
2269                    let (actor, application) = mocks::application::Application::new(
2270                        context.with_label("application"),
2271                        application_cfg,
2272                    );
2273                    actor.start();
2274                    let cfg = JConfig {
2275                        partition: validator.to_string(),
2276                    };
2277                    let journal = Journal::init(context.with_label("journal"), cfg)
2278                        .await
2279                        .expect("unable to create journal");
2280                    let cfg = config::Config {
2281                        crypto: scheme,
2282                        automaton: application.clone(),
2283                        relay: application.clone(),
2284                        committer: application,
2285                        supervisor,
2286                        mailbox_size: 1024,
2287                        namespace: namespace.clone(),
2288                        leader_timeout: Duration::from_secs(1),
2289                        notarization_timeout: Duration::from_secs(2),
2290                        nullify_retry: Duration::from_secs(10),
2291                        fetch_timeout: Duration::from_secs(1),
2292                        activity_timeout,
2293                        max_fetch_count: 1,
2294                        max_fetch_size: 1024 * 512,
2295                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2296                        fetch_concurrent: 1,
2297                        replay_concurrency: 1,
2298                    };
2299                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
2300                    engine.start(voter, resolver);
2301                }
2302            }
2303
2304            // Wait for all engines to finish
2305            let mut completed = HashSet::new();
2306            let mut finalized = HashMap::new();
2307            loop {
2308                let (validator, event) = done_receiver.next().await.unwrap();
2309                if let mocks::application::Progress::Finalized(proof, digest) = event {
2310                    let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
2311                    if digest != payload {
2312                        panic!(
2313                            "finalization mismatch digest: {:?}, payload: {:?}",
2314                            digest, payload
2315                        );
2316                    }
2317                    if let Some(previous) = finalized.insert(view, digest.clone()) {
2318                        if previous != digest {
2319                            panic!(
2320                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2321                                view, previous, digest
2322                            );
2323                        }
2324                    }
2325                    if (finalized.len() as u64) < required_containers {
2326                        continue;
2327                    }
2328                    completed.insert(validator);
2329                }
2330                if completed.len() == (n - 1) as usize {
2331                    break;
2332                }
2333            }
2334
2335            // Check supervisors for correct activity
2336            let byz = &validators[0];
2337            let mut count_nullify_and_finalize = 0;
2338            for supervisor in supervisors.iter() {
2339                // Ensure only faults for byz
2340                {
2341                    let faults = supervisor.faults.lock().unwrap();
2342                    assert_eq!(faults.len(), 1);
2343                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2344                    for (_, faults) in faulter.iter() {
2345                        for fault in faults.iter() {
2346                            match *fault {
2347                                NULLIFY_AND_FINALIZE => {
2348                                    count_nullify_and_finalize += 1;
2349                                }
2350                                _ => panic!("unexpected fault: {:?}", fault),
2351                            }
2352                        }
2353                    }
2354                }
2355            }
2356            assert!(count_nullify_and_finalize > 0);
2357        });
2358    }
2359}