commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal
5//! finalization latency in a partially synchronous setting.
6//!
7//! _If your application would benefit from succinct consensus certificates or a bias-resistant
8//! VRF, see [Threshold Simplex](crate::threshold_simplex)._
9//!
10//! # Features
11//!
12//! * Wicked Fast Block Times (2 Network Hops)
13//! * Optimal Finalization Latency (3 Network Hops)
14//! * Externalized Uptime and Fault Proofs
15//! * Decoupled Block Broadcast and Sync
16//! * Flexible Block Format
17//!
18//! # Design
19//!
20//! ## Architecture
21//!
22//! All logic is split into two components: the `Voter` and the `Resolver` (and the user of `simplex`
23//! provides `Application`). The `Voter` is responsible for participating in the latest view and the
24//! `Resolver` is responsible for fetching artifacts from previous views required to verify proposed
25//! blocks in the latest view.
26//!
27//! To provide great performance, all interactions between `Voter`, `Resolver`, and `Application` are
28//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
29//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
30//!
31//! ```txt
32//! +---------------+           +---------+            +++++++++++++++
33//! |               |<----------+         +----------->+             +
34//! |  Application  |           |  Voter  |            +    Peers    +
35//! |               +---------->|         |<-----------+             +
36//! +---------------+           +--+------+            +++++++++++++++
37//!                                |   ^
38//!                                |   |
39//!                                |   |
40//!                                |   |
41//!                                v   |
42//!                            +-------+----+          +++++++++++++++
43//!                            |            +--------->+             +
44//!                            |  Resolver  |          +    Peers    +
45//!                            |            |<---------+             +
46//!                            +------------+          +++++++++++++++
47//! ```
48//!
49//! _Application is usually a single object that implements the `Automaton`, `Relay`, `Committer`,
50//! and `Supervisor` traits._
51//!
52//! ## Joining Consensus
53//!
54//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter`
55//! will enter `v+1`. This means that a new participant joining consensus will immediately jump
56//! ahead to the latest view and begin participating in consensus (assuming it can verify blocks).
57//!
58//! ## Persistence
59//!
60//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
61//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
62//! consensus and messages it generates to a write-ahead log (WAL) implemented by [`Journal`](https://docs.rs/commonware-storage/latest/commonware_storage/journal/index.html).
63//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
64//! on restart (especially in the case of unclean shutdown).
65//!
66//! ## Protocol Description
67//!
68//! ### Specification for View `v`
69//!
70//! Upon entering view `v`:
71//! * Determine leader `l` for view `v`
72//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
73//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
74//! * If leader `l`, broadcast `notarize(c,v)`
75//!   * If can't propose container in view `v` because missing notarization/nullification for a
76//!     previous view `v_m`, request `v_m`
77//!
78//! Upon receiving first `notarize(c,v)` from `l`:
79//! * Cancel `t_l`
80//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
81//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
82//!
83//! Upon receiving `2f+1` `notarize(c,v)`:
84//! * Cancel `t_a`
85//! * Mark `c` as notarized
86//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
87//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
88//! * Enter `v+1`
89//!
90//! Upon receiving `2f+1` `nullify(v)`:
91//! * Broadcast `nullification(v)`
92//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
93//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
94//!       instead.
95//! * Enter `v+1`
96//!
97//! Upon receiving `2f+1` `finalize(c,v)`:
98//! * Mark `c` as finalized (and recursively finalize its parents)
99//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
100//!
101//! Upon `t_l` or `t_a` firing:
102//! * Broadcast `nullify(v)`
103//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
104//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
105//!
106//! ### Deviations from Simplex Consensus
107//!
108//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
109//!   a set of all notarizations/nullifications for all historical blocks.
110//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
111//!   either a "block" or a "dummy block", respectively.
112//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
113//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
114//!   some number of views (again to trigger early view transition for an unresponsive leader).
115//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
116//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
117
118use commonware_utils::Array;
119
120mod encoder;
121mod prover;
122pub use prover::Prover;
123mod wire {
124    include!(concat!(env!("OUT_DIR"), "/simplex.wire.rs"));
125}
126
127cfg_if::cfg_if! {
128    if #[cfg(not(target_arch = "wasm32"))] {
129        mod actors;
130        mod config;
131        pub use config::Config;
132        mod engine;
133        pub use engine::Engine;
134        mod metrics;
135        mod verifier;
136    }
137}
138
139#[cfg(test)]
140pub mod mocks;
141
142/// View is a monotonically increasing counter that represents the current focus of consensus.
143pub type View = u64;
144
145use crate::Activity;
146
147/// Context is a collection of metadata from consensus about a given payload.
148#[derive(Clone)]
149pub struct Context<D: Array> {
150    /// Current view of consensus.
151    pub view: View,
152
153    /// Parent the payload is built on.
154    ///
155    /// If there is a gap between the current view and the parent view, the participant
156    /// must possess a nullification for each discarded view to safely vote on the proposed
157    /// payload (any view without a nullification may eventually be finalized and skipping
158    /// it would result in a fork).
159    pub parent: (View, D),
160}
161
162/// Notarize a payload at a given view.
163///
164/// ## Clarifications
165/// * Vote for leader is considered a proposal and a vote.
166/// * It is ok to have both a vote for a proposal and the null
167///   container in the same view.
168/// * It is ok to notarize/finalize different proposals in the same view.
169pub const NOTARIZE: Activity = 0;
170/// Finalize a payload at a given view.
171pub const FINALIZE: Activity = 1;
172/// Notarize a payload that conflicts with a previous notarize.
173pub const CONFLICTING_NOTARIZE: Activity = 2;
174/// Finalize a payload that conflicts with a previous finalize.
175pub const CONFLICTING_FINALIZE: Activity = 3;
176/// Nullify and finalize in the same view.
177pub const NULLIFY_AND_FINALIZE: Activity = 4;
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use commonware_cryptography::{sha256::Digest as Sha256Digest, Ed25519, Scheme, Sha256};
183    use commonware_macros::{select, test_traced};
184    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
185    use commonware_runtime::{
186        deterministic::{self, Executor},
187        Clock, Metrics, Runner, Spawner,
188    };
189    use commonware_storage::journal::variable::{Config as JConfig, Journal};
190    use commonware_utils::quorum;
191    use engine::Engine;
192    use futures::{channel::mpsc, StreamExt};
193    use governor::Quota;
194    use prover::Prover;
195    use rand::Rng;
196    use std::{
197        collections::{BTreeMap, HashMap, HashSet},
198        num::NonZeroU32,
199        sync::{Arc, Mutex},
200        time::Duration,
201    };
202    use tracing::debug;
203
204    /// Registers all validators using the oracle.
205    async fn register_validators<P: Array>(
206        oracle: &mut Oracle<P>,
207        validators: &[P],
208    ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
209        let mut registrations = HashMap::new();
210        for validator in validators.iter() {
211            let (voter_sender, voter_receiver) =
212                oracle.register(validator.clone(), 0).await.unwrap();
213            let (resolver_sender, resolver_receiver) =
214                oracle.register(validator.clone(), 1).await.unwrap();
215            registrations.insert(
216                validator.clone(),
217                (
218                    (voter_sender, voter_receiver),
219                    (resolver_sender, resolver_receiver),
220                ),
221            );
222        }
223        registrations
224    }
225
226    /// Enum to describe the action to take when linking validators.
227    enum Action {
228        Link(Link),
229        Update(Link), // Unlink and then link
230        Unlink,
231    }
232
233    /// Links (or unlinks) validators using the oracle.
234    ///
235    /// The `action` parameter determines the action (e.g. link, unlink) to take.
236    /// The `restrict_to` function can be used to restrict the linking to certain connections,
237    /// otherwise all validators will be linked to all other validators.
238    async fn link_validators<P: Array>(
239        oracle: &mut Oracle<P>,
240        validators: &[P],
241        action: Action,
242        restrict_to: Option<fn(usize, usize, usize) -> bool>,
243    ) {
244        for (i1, v1) in validators.iter().enumerate() {
245            for (i2, v2) in validators.iter().enumerate() {
246                // Ignore self
247                if v2 == v1 {
248                    continue;
249                }
250
251                // Restrict to certain connections
252                if let Some(f) = restrict_to {
253                    if !f(validators.len(), i1, i2) {
254                        continue;
255                    }
256                }
257
258                // Do any unlinking first
259                match action {
260                    Action::Update(_) | Action::Unlink => {
261                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
262                    }
263                    _ => {}
264                }
265
266                // Do any linking after
267                match action {
268                    Action::Link(ref link) | Action::Update(ref link) => {
269                        oracle
270                            .add_link(v1.clone(), v2.clone(), link.clone())
271                            .await
272                            .unwrap();
273                    }
274                    _ => {}
275                }
276            }
277        }
278    }
279
280    #[test_traced]
281    fn test_all_online() {
282        // Create context
283        let n = 5;
284        let threshold = quorum(n).expect("unable to calculate threshold");
285        let max_exceptions = 4;
286        let required_containers = 100;
287        let activity_timeout = 10;
288        let namespace = b"consensus".to_vec();
289        let (executor, context, _) = Executor::timed(Duration::from_secs(30));
290        executor.start(async move {
291            // Create simulated network
292            let (network, mut oracle) = Network::new(
293                context.with_label("network"),
294                Config {
295                    max_size: 1024 * 1024,
296                },
297            );
298
299            // Start network
300            network.start();
301
302            // Register participants
303            let mut schemes = Vec::new();
304            let mut validators = Vec::new();
305            for i in 0..n {
306                let scheme = Ed25519::from_seed(i as u64);
307                let pk = scheme.public_key();
308                schemes.push(scheme);
309                validators.push(pk);
310            }
311            validators.sort();
312            schemes.sort_by_key(|s| s.public_key());
313            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
314            let mut registrations = register_validators(&mut oracle, &validators).await;
315
316            // Link all validators
317            let link = Link {
318                latency: 10.0,
319                jitter: 1.0,
320                success_rate: 1.0,
321            };
322            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
323
324            // Create engines
325            let prover = Prover::new(&namespace);
326            let relay = Arc::new(mocks::relay::Relay::new());
327            let mut supervisors = Vec::new();
328            let (done_sender, mut done_receiver) = mpsc::unbounded();
329            let mut engine_handlers = Vec::new();
330            for scheme in schemes.into_iter() {
331                // Create scheme context
332                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
333
334                // Start engine
335                let validator = scheme.public_key();
336                let supervisor_config = mocks::supervisor::Config {
337                    prover: prover.clone(),
338                    participants: view_validators.clone(),
339                };
340                let supervisor =
341                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
342                supervisors.push(supervisor.clone());
343                let application_cfg = mocks::application::Config {
344                    hasher: Sha256::default(),
345                    relay: relay.clone(),
346                    participant: validator.clone(),
347                    tracker: done_sender.clone(),
348                    propose_latency: (10.0, 5.0),
349                    verify_latency: (10.0, 5.0),
350                };
351                let (actor, application) = mocks::application::Application::new(
352                    context.with_label("application"),
353                    application_cfg,
354                );
355                actor.start();
356                let cfg = JConfig {
357                    partition: validator.to_string(),
358                };
359                let journal = Journal::init(context.with_label("journal"), cfg)
360                    .await
361                    .expect("unable to create journal");
362                let cfg = config::Config {
363                    crypto: scheme,
364                    automaton: application.clone(),
365                    relay: application.clone(),
366                    committer: application,
367                    supervisor,
368                    mailbox_size: 1024,
369                    namespace: namespace.clone(),
370                    leader_timeout: Duration::from_secs(1),
371                    notarization_timeout: Duration::from_secs(2),
372                    nullify_retry: Duration::from_secs(10),
373                    fetch_timeout: Duration::from_secs(1),
374                    activity_timeout,
375                    max_fetch_count: 1,
376                    max_fetch_size: 1024 * 512,
377                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
378                    fetch_concurrent: 1,
379                    replay_concurrency: 1,
380                };
381                let engine = Engine::new(context.with_label("engine"), journal, cfg);
382                let (voter, resolver) = registrations
383                    .remove(&validator)
384                    .expect("validator should be registered");
385                engine_handlers.push(engine.start(voter, resolver));
386            }
387
388            // Wait for all engines to finish
389            let mut completed = HashSet::new();
390            let mut finalized = HashMap::new();
391            loop {
392                let (validator, event) = done_receiver.next().await.unwrap();
393                if let mocks::application::Progress::Finalized(proof, digest) = event {
394                    let (view, _, payload, _) =
395                        prover.deserialize_finalization(proof, 5, true).unwrap();
396                    if digest != payload {
397                        panic!(
398                            "finalization mismatch digest: {:?}, payload: {:?}",
399                            digest, payload
400                        );
401                    }
402                    if let Some(previous) = finalized.insert(view, digest.clone()) {
403                        if previous != digest {
404                            panic!(
405                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
406                                view, previous, digest
407                            );
408                        }
409                    }
410                    if (finalized.len() as u64) < required_containers {
411                        continue;
412                    }
413                    completed.insert(validator);
414                }
415                if completed.len() == n as usize {
416                    break;
417                }
418            }
419
420            // Check supervisors for correct activity
421            let latest_complete = required_containers - activity_timeout;
422            for supervisor in supervisors.iter() {
423                // Ensure no faults
424                {
425                    let faults = supervisor.faults.lock().unwrap();
426                    assert!(faults.is_empty());
427                }
428
429                // Ensure no forks
430                let mut exceptions = 0;
431                {
432                    let notarizes = supervisor.notarizes.lock().unwrap();
433                    for (view, payloads) in notarizes.iter() {
434                        // Ensure only one payload proposed per view
435                        if payloads.len() > 1 {
436                            panic!("view: {}", view);
437                        }
438
439                        // Only check at views below timeout
440                        if *view > latest_complete {
441                            continue;
442                        }
443
444                        // Ensure everyone participating
445                        let digest = finalized.get(view).expect("view should be finalized");
446                        let voters = payloads.get(digest).expect("digest should exist");
447                        if voters.len() < threshold as usize {
448                            // We can't verify that everyone participated at every view because some nodes may
449                            // have started later.
450                            panic!("view: {}", view);
451                        }
452                        if voters.len() != n as usize {
453                            exceptions += 1;
454                        }
455                    }
456                }
457                {
458                    let finalizes = supervisor.finalizes.lock().unwrap();
459                    for (view, payloads) in finalizes.iter() {
460                        // Ensure only one payload proposed per view
461                        if payloads.len() > 1 {
462                            panic!("view: {}", view);
463                        }
464
465                        // Only check at views below timeout
466                        if *view > latest_complete {
467                            continue;
468                        }
469
470                        // Ensure everyone participating
471                        let digest = finalized.get(view).expect("view should be finalized");
472                        let finalizers = payloads.get(digest).expect("digest should exist");
473                        if finalizers.len() < threshold as usize {
474                            // We can't verify that everyone participated at every view because some nodes may
475                            // have started later.
476                            panic!("view: {}", view);
477                        }
478                        if finalizers.len() != n as usize {
479                            exceptions += 1;
480                        }
481                    }
482                }
483
484                // Ensure exceptions within allowed
485                assert!(exceptions <= max_exceptions);
486            }
487        });
488    }
489
490    #[test_traced]
491    fn test_unclean_shutdown() {
492        // Create context
493        let n = 5;
494        let required_containers = 100;
495        let activity_timeout = 10;
496        let namespace = b"consensus".to_vec();
497
498        // Random restarts every x seconds
499        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
500        let notarized = Arc::new(Mutex::new(HashMap::new()));
501        let finalized = Arc::new(Mutex::new(HashMap::new()));
502        let completed = Arc::new(Mutex::new(HashSet::new()));
503        let supervised = Arc::new(Mutex::new(Vec::new()));
504        let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(30));
505        while completed.lock().unwrap().len() != n as usize {
506            let namespace = namespace.clone();
507            let shutdowns = shutdowns.clone();
508            let notarized = notarized.clone();
509            let finalized = finalized.clone();
510            let completed = completed.clone();
511            let supervised = supervised.clone();
512            executor.start({
513                let mut context = context.clone();
514                async move {
515                // Create simulated network
516                let (network, mut oracle) = Network::new(
517                    context.with_label("network"),
518                    Config {
519                        max_size: 1024 * 1024,
520                    },
521                );
522
523                // Start network
524                network.start();
525
526                // Register participants
527                let mut schemes = Vec::new();
528                let mut validators = Vec::new();
529                for i in 0..n {
530                    let scheme = Ed25519::from_seed(i as u64);
531                    let pk = scheme.public_key();
532                    schemes.push(scheme);
533                    validators.push(pk);
534                }
535                validators.sort();
536                schemes.sort_by_key(|s| s.public_key());
537                let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
538                let mut registrations = register_validators(&mut oracle, &validators).await;
539
540                // Link all validators
541                let link = Link {
542                    latency: 50.0,
543                    jitter: 50.0,
544                    success_rate: 1.0,
545                };
546                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
547
548                // Create engines
549                let prover = Prover::new(&namespace);
550                let relay = Arc::new(mocks::relay::Relay::new());
551                let mut supervisors = HashMap::new();
552                let (done_sender, mut done_receiver) = mpsc::unbounded();
553                let mut engine_handlers = Vec::new();
554                for scheme in schemes.into_iter() {
555                    // Create scheme context
556                    let context = context
557                        .clone()
558                        .with_label(&format!("validator-{}", scheme.public_key()));
559
560                    // Start engine
561                    let validator = scheme.public_key();
562                    let supervisor_config = mocks::supervisor::Config {
563                        prover: prover.clone(),
564                        participants: view_validators.clone(),
565                    };
566                    let supervisor =
567                        mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
568                    supervisors.insert(validator.clone(), supervisor.clone());
569                    let application_cfg = mocks::application::Config {
570                        hasher: Sha256::default(),
571                        relay: relay.clone(),
572                        participant: validator.clone(),
573                        tracker: done_sender.clone(),
574                        propose_latency: (10.0, 5.0),
575                        verify_latency: (10.0, 5.0),
576                    };
577                    let (actor, application) =
578                        mocks::application::Application::new(context.with_label("application"), application_cfg);
579                    actor.start();
580                    let cfg = JConfig {
581                        partition: validator.to_string(),
582                    };
583                    let journal = Journal::init(context.with_label("journal"), cfg)
584                        .await
585                        .expect("unable to create journal");
586                    let cfg = config::Config {
587                        crypto: scheme,
588                        automaton: application.clone(),
589                        relay: application.clone(),
590                        committer: application,
591                        supervisor,
592                        mailbox_size: 1024,
593                        namespace: namespace.clone(),
594                        leader_timeout: Duration::from_secs(1),
595                        notarization_timeout: Duration::from_secs(2),
596                        nullify_retry: Duration::from_secs(10),
597                        fetch_timeout: Duration::from_secs(1),
598                        activity_timeout,
599                        max_fetch_count: 1,
600                        max_fetch_size: 1024 * 512,
601                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
602                        fetch_concurrent: 1,
603                        replay_concurrency: 1,
604                    };
605                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
606                    let (voter_network, resolver_network) =
607                        registrations.remove(&validator).expect("validator should be registered");
608                    engine_handlers.push(engine.start(voter_network, resolver_network));
609                }
610
611                // Wait for all engines to finish
612                context.with_label("confirmed").spawn(move |_| async move {
613                    loop {
614                        // Parse events
615                        let (validator, event) = done_receiver.next().await.unwrap();
616                        match event {
617                            mocks::application::Progress::Notarized(proof, digest) => {
618                                // Check correctness of proof
619                                let (view, _, payload, _) =
620                                    prover.deserialize_notarization(proof, 5, true).unwrap();
621                                if digest != payload {
622                                    panic!(
623                                        "notarization mismatch digest: {:?}, payload: {:?}",
624                                        digest, payload
625                                    );
626                                }
627
628                                // Store notarized
629                                {
630                                    let mut notarized = notarized.lock().unwrap();
631                                    if let Some(previous) = notarized.insert(view, digest.clone())
632                                    {
633                                        if previous != digest {
634                                            panic!(
635                                                "notarization mismatch at {:?} previous: {:?}, current: {:?}",
636                                                view, previous, digest
637                                            );
638                                        }
639                                    }
640                                    if (notarized.len() as u64) < required_containers {
641                                        continue;
642                                    }
643                                }
644                            }
645                            mocks::application::Progress::Finalized(proof, digest) => {
646                                // Check correctness of proof
647                                let (view, _, payload, _) =
648                                    prover.deserialize_finalization(proof, 5, true).unwrap();
649                                if digest != payload {
650                                    panic!(
651                                        "finalization mismatch digest: {:?}, payload: {:?}",
652                                        digest, payload
653                                    );
654                                }
655
656                                // Store finalized
657                                {
658                                    let mut finalized = finalized.lock().unwrap();
659                                    if let Some(previous) = finalized.insert(view, digest.clone()) {
660                                        if previous != digest {
661                                            panic!(
662                                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
663                                                view, previous, digest
664                                            );
665                                        }
666                                    }
667                                    if (finalized.len() as u64) < required_containers {
668                                        continue;
669                                    }
670                                }
671                                completed.lock().unwrap().insert(validator);
672                            }
673                        }
674                    }
675                });
676
677                // Exit at random points for unclean shutdown of entire set
678                let wait =
679                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
680                context.sleep(wait).await;
681                {
682                    let mut shutdowns = shutdowns.lock().unwrap();
683                    debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
684                    *shutdowns += 1;
685                }
686
687                // Collect supervisors
688                supervised.lock().unwrap().push(supervisors);
689            }});
690
691            // Recover context
692            (executor, context, _) = context.recover();
693        }
694
695        // Check supervisors for faults activity
696        let supervised = supervised.lock().unwrap();
697        for supervisors in supervised.iter() {
698            for (_, supervisor) in supervisors.iter() {
699                let faults = supervisor.faults.lock().unwrap();
700                assert!(faults.is_empty());
701            }
702        }
703    }
704
705    #[test_traced]
706    fn test_backfill() {
707        // Create context
708        let n = 4;
709        let required_containers = 100;
710        let activity_timeout = 10;
711        let namespace = b"consensus".to_vec();
712        let (executor, context, _) = Executor::timed(Duration::from_secs(360));
713        executor.start(async move {
714            // Create simulated network
715            let (network, mut oracle) = Network::new(
716                context.with_label("network"),
717                Config {
718                    max_size: 1024 * 1024,
719                },
720            );
721
722            // Start network
723            network.start();
724
725            // Register participants
726            let mut schemes = Vec::new();
727            let mut validators = Vec::new();
728            for i in 0..n {
729                let scheme = Ed25519::from_seed(i as u64);
730                let pk = scheme.public_key();
731                schemes.push(scheme);
732                validators.push(pk);
733            }
734            validators.sort();
735            schemes.sort_by_key(|s| s.public_key());
736            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
737            let mut registrations = register_validators(&mut oracle, &validators).await;
738
739            // Link all validators except first
740            let link = Link {
741                latency: 10.0,
742                jitter: 1.0,
743                success_rate: 1.0,
744            };
745            link_validators(
746                &mut oracle,
747                &validators,
748                Action::Link(link),
749                Some(|_, i, j| ![i, j].contains(&0usize)),
750            )
751            .await;
752
753            // Create engines
754            let prover = Prover::new(&namespace);
755            let relay = Arc::new(mocks::relay::Relay::new());
756            let mut supervisors = Vec::new();
757            let (done_sender, mut done_receiver) = mpsc::unbounded();
758            let mut engine_handlers = Vec::new();
759            for (idx_scheme, scheme) in schemes.iter().enumerate() {
760                // Skip first peer
761                if idx_scheme == 0 {
762                    continue;
763                }
764
765                // Create scheme context
766                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
767
768                // Start engine
769                let validator = scheme.public_key();
770                let supervisor_config = mocks::supervisor::Config {
771                    prover: prover.clone(),
772                    participants: view_validators.clone(),
773                };
774                let supervisor =
775                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
776                supervisors.push(supervisor.clone());
777                let application_cfg = mocks::application::Config {
778                    hasher: Sha256::default(),
779                    relay: relay.clone(),
780                    participant: validator.clone(),
781                    tracker: done_sender.clone(),
782                    propose_latency: (10.0, 5.0),
783                    verify_latency: (10.0, 5.0),
784                };
785                let (actor, application) = mocks::application::Application::new(
786                    context.with_label("application"),
787                    application_cfg,
788                );
789                actor.start();
790                let cfg = JConfig {
791                    partition: validator.to_string(),
792                };
793                let journal = Journal::init(context.with_label("journal"), cfg)
794                    .await
795                    .expect("unable to create journal");
796                let cfg = config::Config {
797                    crypto: scheme.clone(),
798                    automaton: application.clone(),
799                    relay: application.clone(),
800                    committer: application,
801                    supervisor,
802                    mailbox_size: 1024,
803                    namespace: namespace.clone(),
804                    leader_timeout: Duration::from_secs(1),
805                    notarization_timeout: Duration::from_secs(2),
806                    nullify_retry: Duration::from_secs(10),
807                    fetch_timeout: Duration::from_secs(1),
808                    activity_timeout,
809                    max_fetch_count: 1, // force many fetches
810                    max_fetch_size: 1024 * 1024,
811                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
812                    fetch_concurrent: 1,
813                    replay_concurrency: 1,
814                };
815                let (voter, resolver) = registrations
816                    .remove(&validator)
817                    .expect("validator should be registered");
818                let engine = Engine::new(context.with_label("engine"), journal, cfg);
819                engine_handlers.push(engine.start(voter, resolver));
820            }
821
822            // Wait for all online engines to finish
823            let mut completed = HashSet::new();
824            let mut finalized = HashMap::new();
825            loop {
826                let (validator, event) = done_receiver.next().await.unwrap();
827                if let mocks::application::Progress::Finalized(proof, digest) = event {
828                    let (view, _, payload, _) =
829                        prover.deserialize_finalization(proof, 5, true).unwrap();
830                    if digest != payload {
831                        panic!(
832                            "finalization mismatch digest: {:?}, payload: {:?}",
833                            digest, payload
834                        );
835                    }
836                    finalized.insert(view, digest);
837                    if (finalized.len() as u64) < required_containers {
838                        continue;
839                    }
840                    completed.insert(validator);
841                }
842                if completed.len() == (n - 1) as usize {
843                    break;
844                }
845            }
846
847            // Degrade network connections for online peers
848            let link = Link {
849                latency: 3_000.0,
850                jitter: 0.0,
851                success_rate: 1.0,
852            };
853            link_validators(
854                &mut oracle,
855                &validators,
856                Action::Update(link.clone()),
857                Some(|_, i, j| ![i, j].contains(&0usize)),
858            )
859            .await;
860
861            // Wait for nullifications to accrue
862            context.sleep(Duration::from_secs(120)).await;
863
864            // Unlink second peer from all (except first)
865            link_validators(
866                &mut oracle,
867                &validators,
868                Action::Unlink,
869                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
870            )
871            .await;
872
873            // Start engine for first peer
874            let scheme = schemes[0].clone();
875            let validator = scheme.public_key();
876            {
877                // Create scheme context
878                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
879
880                // Link first peer to all (except second)
881                link_validators(
882                    &mut oracle,
883                    &validators,
884                    Action::Link(link),
885                    Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
886                )
887                .await;
888
889                // Restore network connections for all online peers
890                let link = Link {
891                    latency: 10.0,
892                    jitter: 2.5,
893                    success_rate: 1.0,
894                };
895                link_validators(
896                    &mut oracle,
897                    &validators,
898                    Action::Update(link),
899                    Some(|_, i, j| ![i, j].contains(&1usize)),
900                )
901                .await;
902
903                // Start engine
904                let supervisor_config = mocks::supervisor::Config {
905                    prover: prover.clone(),
906                    participants: view_validators.clone(),
907                };
908                let supervisor =
909                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
910                supervisors.push(supervisor.clone());
911                let application_cfg = mocks::application::Config {
912                    hasher: Sha256::default(),
913                    relay: relay.clone(),
914                    participant: validator.clone(),
915                    tracker: done_sender.clone(),
916                    propose_latency: (10.0, 5.0),
917                    verify_latency: (10.0, 5.0),
918                };
919                let (actor, application) = mocks::application::Application::new(
920                    context.with_label("application"),
921                    application_cfg,
922                );
923                actor.start();
924                let cfg = JConfig {
925                    partition: validator.to_string(),
926                };
927                let journal = Journal::init(context.with_label("journal"), cfg)
928                    .await
929                    .expect("unable to create journal");
930                let cfg = config::Config {
931                    crypto: scheme,
932                    automaton: application.clone(),
933                    relay: application.clone(),
934                    committer: application,
935                    supervisor,
936                    mailbox_size: 1024,
937                    namespace: namespace.clone(),
938                    leader_timeout: Duration::from_secs(1),
939                    notarization_timeout: Duration::from_secs(2),
940                    nullify_retry: Duration::from_secs(10),
941                    fetch_timeout: Duration::from_secs(1),
942                    activity_timeout,
943                    max_fetch_count: 1,
944                    max_fetch_size: 1024 * 512,
945                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
946                    fetch_concurrent: 1,
947                    replay_concurrency: 1,
948                };
949                let (voter, resolver) = registrations
950                    .remove(&validator)
951                    .expect("validator should be registered");
952                let engine = Engine::new(context.with_label("engine"), journal, cfg);
953                engine_handlers.push(engine.start(voter, resolver));
954            }
955
956            // Wait for new engine to finalize required
957            let mut finalized = HashMap::new();
958            let mut validator_finalized = HashSet::new();
959            loop {
960                let (candidate, event) = done_receiver.next().await.unwrap();
961                if let mocks::application::Progress::Finalized(proof, digest) = event {
962                    let (view, _, payload, _) =
963                        prover.deserialize_finalization(proof, 5, true).unwrap();
964                    if digest != payload {
965                        panic!(
966                            "finalization mismatch digest: {:?}, payload: {:?}",
967                            digest, payload
968                        );
969                    }
970                    if let Some(previous) = finalized.insert(view, digest.clone()) {
971                        if previous != digest {
972                            panic!(
973                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
974                                view, previous, digest
975                            );
976                        }
977                    }
978                    if validator == candidate {
979                        validator_finalized.insert(view);
980                    }
981                }
982                if validator_finalized.len() == required_containers as usize {
983                    break;
984                }
985            }
986        });
987    }
988
989    #[test_traced]
990    fn test_one_offline() {
991        // Create context
992        let n = 5;
993        let required_containers = 100;
994        let activity_timeout = 10;
995        let namespace = b"consensus".to_vec();
996        let (executor, context, _) = Executor::timed(Duration::from_secs(30));
997        executor.start(async move {
998            // Create simulated network
999            let (network, mut oracle) = Network::new(
1000                context.with_label("network"),
1001                Config {
1002                    max_size: 1024 * 1024,
1003                },
1004            );
1005
1006            // Start network
1007            network.start();
1008
1009            // Register participants
1010            let mut schemes = Vec::new();
1011            let mut validators = Vec::new();
1012            for i in 0..n {
1013                let scheme = Ed25519::from_seed(i as u64);
1014                let pk = scheme.public_key();
1015                schemes.push(scheme);
1016                validators.push(pk);
1017            }
1018            validators.sort();
1019            schemes.sort_by_key(|s| s.public_key());
1020            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1021            let mut registrations = register_validators(&mut oracle, &validators).await;
1022
1023            // Link all validators except first
1024            let link = Link {
1025                latency: 10.0,
1026                jitter: 1.0,
1027                success_rate: 1.0,
1028            };
1029            link_validators(
1030                &mut oracle,
1031                &validators,
1032                Action::Link(link),
1033                Some(|_, i, j| ![i, j].contains(&0usize)),
1034            )
1035            .await;
1036
1037            // Create engines
1038            let prover = Prover::new(&namespace);
1039            let relay = Arc::new(mocks::relay::Relay::new());
1040            let mut supervisors = Vec::new();
1041            let (done_sender, mut done_receiver) = mpsc::unbounded();
1042            let mut engine_handlers = Vec::new();
1043            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1044                // Skip first peer
1045                if idx_scheme == 0 {
1046                    continue;
1047                }
1048
1049                // Create scheme context
1050                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1051
1052                // Start engine
1053                let validator = scheme.public_key();
1054                let supervisor_config = mocks::supervisor::Config {
1055                    prover: prover.clone(),
1056                    participants: view_validators.clone(),
1057                };
1058                let supervisor =
1059                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1060                supervisors.push(supervisor.clone());
1061                let application_cfg = mocks::application::Config {
1062                    hasher: Sha256::default(),
1063                    relay: relay.clone(),
1064                    participant: validator.clone(),
1065                    tracker: done_sender.clone(),
1066                    propose_latency: (10.0, 5.0),
1067                    verify_latency: (10.0, 5.0),
1068                };
1069                let (actor, application) = mocks::application::Application::new(
1070                    context.with_label("application"),
1071                    application_cfg,
1072                );
1073                actor.start();
1074                let cfg = JConfig {
1075                    partition: validator.to_string(),
1076                };
1077                let journal = Journal::init(context.with_label("journal"), cfg)
1078                    .await
1079                    .expect("unable to create journal");
1080                let cfg = config::Config {
1081                    crypto: scheme,
1082                    automaton: application.clone(),
1083                    relay: application.clone(),
1084                    committer: application,
1085                    supervisor,
1086                    mailbox_size: 1024,
1087                    namespace: namespace.clone(),
1088                    leader_timeout: Duration::from_secs(1),
1089                    notarization_timeout: Duration::from_secs(2),
1090                    nullify_retry: Duration::from_secs(10),
1091                    fetch_timeout: Duration::from_secs(1),
1092                    activity_timeout,
1093                    max_fetch_count: 1,
1094                    max_fetch_size: 1024 * 512,
1095                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1096                    fetch_concurrent: 1,
1097                    replay_concurrency: 1,
1098                };
1099                let (voter, resolver) = registrations
1100                    .remove(&validator)
1101                    .expect("validator should be registered");
1102                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1103                engine_handlers.push(engine.start(voter, resolver));
1104            }
1105
1106            // Wait for all engines to finish
1107            let mut completed = HashSet::new();
1108            let mut finalized = HashMap::new();
1109            loop {
1110                let (validator, event) = done_receiver.next().await.unwrap();
1111                if let mocks::application::Progress::Finalized(proof, digest) = event {
1112                    let (view, _, payload, _) =
1113                        prover.deserialize_finalization(proof, 5, true).unwrap();
1114                    if digest != payload {
1115                        panic!(
1116                            "finalization mismatch digest: {:?}, payload: {:?}",
1117                            digest, payload
1118                        );
1119                    }
1120                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1121                        if previous != digest {
1122                            panic!(
1123                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1124                                view, previous, digest
1125                            );
1126                        }
1127                    }
1128                    if (finalized.len() as u64) < required_containers {
1129                        continue;
1130                    }
1131                    completed.insert(validator);
1132                }
1133                if completed.len() == (n - 1) as usize {
1134                    break;
1135                }
1136            }
1137
1138            // Check supervisors for correct activity
1139            let offline = &validators[0];
1140            for supervisor in supervisors.iter() {
1141                // Ensure no faults
1142                {
1143                    let faults = supervisor.faults.lock().unwrap();
1144                    assert!(faults.is_empty());
1145                }
1146
1147                // Ensure offline node is never active
1148                {
1149                    let notarizes = supervisor.notarizes.lock().unwrap();
1150                    for (view, payloads) in notarizes.iter() {
1151                        for (_, participants) in payloads.iter() {
1152                            if participants.contains(offline) {
1153                                panic!("view: {}", view);
1154                            }
1155                        }
1156                    }
1157                }
1158                {
1159                    let finalizes = supervisor.finalizes.lock().unwrap();
1160                    for (view, payloads) in finalizes.iter() {
1161                        for (_, finalizers) in payloads.iter() {
1162                            if finalizers.contains(offline) {
1163                                panic!("view: {}", view);
1164                            }
1165                        }
1166                    }
1167                }
1168            }
1169        });
1170    }
1171
1172    #[test_traced]
1173    fn test_slow_validator() {
1174        // Create context
1175        let n = 5;
1176        let required_containers = 50;
1177        let activity_timeout = 10;
1178        let namespace = b"consensus".to_vec();
1179        let (executor, context, _) = Executor::timed(Duration::from_secs(30));
1180        executor.start(async move {
1181            // Create simulated network
1182            let (network, mut oracle) = Network::new(
1183                context.with_label("network"),
1184                Config {
1185                    max_size: 1024 * 1024,
1186                },
1187            );
1188
1189            // Start network
1190            network.start();
1191
1192            // Register participants
1193            let mut schemes = Vec::new();
1194            let mut validators = Vec::new();
1195            for i in 0..n {
1196                let scheme = Ed25519::from_seed(i as u64);
1197                let pk = scheme.public_key();
1198                schemes.push(scheme);
1199                validators.push(pk);
1200            }
1201            validators.sort();
1202            schemes.sort_by_key(|s| s.public_key());
1203            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1204            let mut registrations = register_validators(&mut oracle, &validators).await;
1205
1206            // Link all validators
1207            let link = Link {
1208                latency: 10.0,
1209                jitter: 1.0,
1210                success_rate: 1.0,
1211            };
1212            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1213
1214            // Create engines
1215            let prover = Prover::new(&namespace);
1216            let relay = Arc::new(mocks::relay::Relay::new());
1217            let mut supervisors = Vec::new();
1218            let (done_sender, mut done_receiver) = mpsc::unbounded();
1219            let mut engine_handlers = Vec::new();
1220            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1221                // Create scheme context
1222                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1223
1224                // Start engine
1225                let validator = scheme.public_key();
1226                let supervisor_config = mocks::supervisor::Config {
1227                    prover: prover.clone(),
1228                    participants: view_validators.clone(),
1229                };
1230                let supervisor =
1231                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1232                supervisors.push(supervisor.clone());
1233                let application_cfg = if idx_scheme == 0 {
1234                    mocks::application::Config {
1235                        hasher: Sha256::default(),
1236                        relay: relay.clone(),
1237                        participant: validator.clone(),
1238                        tracker: done_sender.clone(),
1239                        propose_latency: (3_000.0, 0.0),
1240                        verify_latency: (3_000.0, 5.0),
1241                    }
1242                } else {
1243                    mocks::application::Config {
1244                        hasher: Sha256::default(),
1245                        relay: relay.clone(),
1246                        participant: validator.clone(),
1247                        tracker: done_sender.clone(),
1248                        propose_latency: (10.0, 5.0),
1249                        verify_latency: (10.0, 5.0),
1250                    }
1251                };
1252                let (actor, application) = mocks::application::Application::new(
1253                    context.with_label("application"),
1254                    application_cfg,
1255                );
1256                actor.start();
1257                let cfg = JConfig {
1258                    partition: validator.to_string(),
1259                };
1260                let journal = Journal::init(context.with_label("journal"), cfg)
1261                    .await
1262                    .expect("unable to create journal");
1263                let cfg = config::Config {
1264                    crypto: scheme,
1265                    automaton: application.clone(),
1266                    relay: application.clone(),
1267                    committer: application,
1268                    supervisor,
1269                    mailbox_size: 1024,
1270                    namespace: namespace.clone(),
1271                    leader_timeout: Duration::from_secs(1),
1272                    notarization_timeout: Duration::from_secs(2),
1273                    nullify_retry: Duration::from_secs(10),
1274                    fetch_timeout: Duration::from_secs(1),
1275                    activity_timeout,
1276                    max_fetch_count: 1,
1277                    max_fetch_size: 1024 * 512,
1278                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1279                    fetch_concurrent: 1,
1280                    replay_concurrency: 1,
1281                };
1282                let (voter, resolver) = registrations
1283                    .remove(&validator)
1284                    .expect("validator should be registered");
1285                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1286                engine_handlers.push(engine.start(voter, resolver));
1287            }
1288
1289            // Wait for all engines to finish
1290            let mut completed = HashSet::new();
1291            let mut finalized = HashMap::new();
1292            loop {
1293                let (validator, event) = done_receiver.next().await.unwrap();
1294                if let mocks::application::Progress::Finalized(proof, digest) = event {
1295                    let (view, _, payload, _) =
1296                        prover.deserialize_finalization(proof, 5, true).unwrap();
1297                    if digest != payload {
1298                        panic!(
1299                            "finalization mismatch digest: {:?}, payload: {:?}",
1300                            digest, payload
1301                        );
1302                    }
1303                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1304                        if previous != digest {
1305                            panic!(
1306                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1307                                view, previous, digest
1308                            );
1309                        }
1310                    }
1311                    if (finalized.len() as u64) < required_containers {
1312                        continue;
1313                    }
1314                    completed.insert(validator);
1315                }
1316                if completed.len() == n as usize {
1317                    break;
1318                }
1319            }
1320
1321            // Check supervisors for correct activity
1322            let slow = &validators[0];
1323            for supervisor in supervisors.iter() {
1324                // Ensure no faults
1325                {
1326                    let faults = supervisor.faults.lock().unwrap();
1327                    assert!(faults.is_empty());
1328                }
1329
1330                // Ensure slow node is never active
1331                {
1332                    let notarizes = supervisor.notarizes.lock().unwrap();
1333                    for (view, payloads) in notarizes.iter() {
1334                        for (_, participants) in payloads.iter() {
1335                            if participants.contains(slow) {
1336                                panic!("view: {}", view);
1337                            }
1338                        }
1339                    }
1340                }
1341                {
1342                    let finalizes = supervisor.finalizes.lock().unwrap();
1343                    for (view, payloads) in finalizes.iter() {
1344                        for (_, finalizers) in payloads.iter() {
1345                            if finalizers.contains(slow) {
1346                                panic!("view: {}", view);
1347                            }
1348                        }
1349                    }
1350                }
1351            }
1352        });
1353    }
1354
1355    #[test_traced]
1356    fn test_all_recovery() {
1357        // Create context
1358        let n = 5;
1359        let required_containers = 100;
1360        let activity_timeout = 10;
1361        let namespace = b"consensus".to_vec();
1362        let (executor, context, _) = Executor::timed(Duration::from_secs(120));
1363        executor.start(async move {
1364            // Create simulated network
1365            let (network, mut oracle) = Network::new(
1366                context.with_label("network"),
1367                Config {
1368                    max_size: 1024 * 1024,
1369                },
1370            );
1371
1372            // Start network
1373            network.start();
1374
1375            // Register participants
1376            let mut schemes = Vec::new();
1377            let mut validators = Vec::new();
1378            for i in 0..n {
1379                let scheme = Ed25519::from_seed(i as u64);
1380                let pk = scheme.public_key();
1381                schemes.push(scheme);
1382                validators.push(pk);
1383            }
1384            validators.sort();
1385            schemes.sort_by_key(|s| s.public_key());
1386            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1387            let mut registrations = register_validators(&mut oracle, &validators).await;
1388
1389            // Link all validators
1390            let link = Link {
1391                latency: 3_000.0,
1392                jitter: 0.0,
1393                success_rate: 1.0,
1394            };
1395            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1396
1397            // Create engines
1398            let prover = Prover::new(&namespace);
1399            let relay = Arc::new(mocks::relay::Relay::new());
1400            let mut supervisors = Vec::new();
1401            let (done_sender, mut done_receiver) = mpsc::unbounded();
1402            let mut engine_handlers = Vec::new();
1403            for scheme in schemes.iter() {
1404                // Create scheme context
1405                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1406
1407                // Start engine
1408                let validator = scheme.public_key();
1409                let supervisor_config = mocks::supervisor::Config {
1410                    prover: prover.clone(),
1411                    participants: view_validators.clone(),
1412                };
1413                let supervisor =
1414                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1415                supervisors.push(supervisor.clone());
1416                let application_cfg = mocks::application::Config {
1417                    hasher: Sha256::default(),
1418                    relay: relay.clone(),
1419                    participant: validator.clone(),
1420                    tracker: done_sender.clone(),
1421                    propose_latency: (10.0, 5.0),
1422                    verify_latency: (10.0, 5.0),
1423                };
1424                let (actor, application) = mocks::application::Application::new(
1425                    context.with_label("application"),
1426                    application_cfg,
1427                );
1428                actor.start();
1429                let cfg = JConfig {
1430                    partition: validator.to_string(),
1431                };
1432                let journal = Journal::init(context.with_label("journal"), cfg)
1433                    .await
1434                    .expect("unable to create journal");
1435                let cfg = config::Config {
1436                    crypto: scheme.clone(),
1437                    automaton: application.clone(),
1438                    relay: application.clone(),
1439                    committer: application,
1440                    supervisor,
1441                    mailbox_size: 1024,
1442                    namespace: namespace.clone(),
1443                    leader_timeout: Duration::from_secs(1),
1444                    notarization_timeout: Duration::from_secs(2),
1445                    nullify_retry: Duration::from_secs(10),
1446                    fetch_timeout: Duration::from_secs(1),
1447                    activity_timeout,
1448                    max_fetch_count: 1,
1449                    max_fetch_size: 1024 * 512,
1450                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1451                    fetch_concurrent: 1,
1452                    replay_concurrency: 1,
1453                };
1454                let (voter, resolver) = registrations
1455                    .remove(&validator)
1456                    .expect("validator should be registered");
1457                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1458                engine_handlers.push(engine.start(voter, resolver));
1459            }
1460
1461            // Wait for a few virtual minutes (shouldn't finalize anything)
1462            select! {
1463                _timeout = context.sleep(Duration::from_secs(60)) => {},
1464                _done = done_receiver.next() => {
1465                    panic!("engine should not notarize or finalize anything");
1466                }
1467            }
1468
1469            // Update links
1470            let link = Link {
1471                latency: 10.0,
1472                jitter: 1.0,
1473                success_rate: 1.0,
1474            };
1475            link_validators(&mut oracle, &validators, Action::Update(link), None).await;
1476
1477            // Wait for all engines to finish
1478            let mut completed = HashSet::new();
1479            let mut finalized = HashMap::new();
1480            loop {
1481                let (validator, event) = done_receiver.next().await.unwrap();
1482                if let mocks::application::Progress::Finalized(proof, digest) = event {
1483                    let (view, _, payload, _) =
1484                        prover.deserialize_finalization(proof, 5, true).unwrap();
1485                    if digest != payload {
1486                        panic!(
1487                            "finalization mismatch digest: {:?}, payload: {:?}",
1488                            digest, payload
1489                        );
1490                    }
1491                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1492                        if previous != digest {
1493                            panic!(
1494                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1495                                view, previous, digest
1496                            );
1497                        }
1498                    }
1499                    if (finalized.len() as u64) < required_containers {
1500                        continue;
1501                    }
1502                    completed.insert(validator);
1503                }
1504                if completed.len() == n as usize {
1505                    break;
1506                }
1507            }
1508
1509            // Check supervisors for correct activity
1510            for supervisor in supervisors.iter() {
1511                // Ensure no faults
1512                {
1513                    let faults = supervisor.faults.lock().unwrap();
1514                    assert!(faults.is_empty());
1515                }
1516            }
1517        });
1518    }
1519
1520    #[test_traced]
1521    fn test_partition() {
1522        // Create context
1523        let n = 10;
1524        let required_containers = 50;
1525        let activity_timeout = 10;
1526        let namespace = b"consensus".to_vec();
1527        let (executor, context, _) = Executor::timed(Duration::from_secs(900));
1528        executor.start(async move {
1529            // Create simulated network
1530            let (network, mut oracle) = Network::new(
1531                context.with_label("network"),
1532                Config {
1533                    max_size: 1024 * 1024,
1534                },
1535            );
1536
1537            // Start network
1538            network.start();
1539
1540            // Register participants
1541            let mut schemes = Vec::new();
1542            let mut validators = Vec::new();
1543            for i in 0..n {
1544                let scheme = Ed25519::from_seed(i as u64);
1545                let pk = scheme.public_key();
1546                schemes.push(scheme);
1547                validators.push(pk);
1548            }
1549            validators.sort();
1550            schemes.sort_by_key(|s| s.public_key());
1551            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1552            let mut registrations = register_validators(&mut oracle, &validators).await;
1553
1554            // Link all validators
1555            let link = Link {
1556                latency: 10.0,
1557                jitter: 1.0,
1558                success_rate: 1.0,
1559            };
1560            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1561
1562            // Create engines
1563            let prover = Prover::new(&namespace);
1564            let relay = Arc::new(mocks::relay::Relay::new());
1565            let mut supervisors = Vec::new();
1566            let (done_sender, mut done_receiver) = mpsc::unbounded();
1567            let mut engine_handlers = Vec::new();
1568            for scheme in schemes.iter() {
1569                // Start engine
1570                let validator = scheme.public_key();
1571                let supervisor_config = mocks::supervisor::Config {
1572                    prover: prover.clone(),
1573                    participants: view_validators.clone(),
1574                };
1575                let supervisor =
1576                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1577                supervisors.push(supervisor.clone());
1578                let application_cfg = mocks::application::Config {
1579                    hasher: Sha256::default(),
1580                    relay: relay.clone(),
1581                    participant: validator.clone(),
1582                    tracker: done_sender.clone(),
1583                    propose_latency: (10.0, 5.0),
1584                    verify_latency: (10.0, 5.0),
1585                };
1586                let (actor, application) = mocks::application::Application::new(
1587                    context.with_label("application"),
1588                    application_cfg,
1589                );
1590                actor.start();
1591                let cfg = JConfig {
1592                    partition: validator.to_string(),
1593                };
1594                let journal = Journal::init(context.with_label("journal"), cfg)
1595                    .await
1596                    .expect("unable to create journal");
1597                let cfg = config::Config {
1598                    crypto: scheme.clone(),
1599                    automaton: application.clone(),
1600                    relay: application.clone(),
1601                    committer: application,
1602                    supervisor,
1603                    mailbox_size: 1024,
1604                    namespace: namespace.clone(),
1605                    leader_timeout: Duration::from_secs(1),
1606                    notarization_timeout: Duration::from_secs(2),
1607                    nullify_retry: Duration::from_secs(10),
1608                    fetch_timeout: Duration::from_secs(1),
1609                    activity_timeout,
1610                    max_fetch_count: 1,
1611                    max_fetch_size: 1024 * 512,
1612                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1613                    fetch_concurrent: 1,
1614                    replay_concurrency: 1,
1615                };
1616                let (voter, resolver) = registrations
1617                    .remove(&validator)
1618                    .expect("validator should be registered");
1619                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1620                engine_handlers.push(engine.start(voter, resolver));
1621            }
1622
1623            // Wait for all engines to finish
1624            let mut completed = HashSet::new();
1625            let mut finalized = HashMap::new();
1626            let mut highest_finalized = 0;
1627            loop {
1628                let (validator, event) = done_receiver.next().await.unwrap();
1629                if let mocks::application::Progress::Finalized(proof, digest) = event {
1630                    let (view, _, payload, _) =
1631                        prover.deserialize_finalization(proof, 10, true).unwrap();
1632                    if digest != payload {
1633                        panic!(
1634                            "finalization mismatch digest: {:?}, payload: {:?}",
1635                            digest, payload
1636                        );
1637                    }
1638                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1639                        if previous != digest {
1640                            panic!(
1641                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1642                                view, previous, digest
1643                            );
1644                        }
1645                    }
1646                    if view > highest_finalized {
1647                        highest_finalized = view;
1648                    }
1649                    if (finalized.len() as u64) < required_containers {
1650                        continue;
1651                    }
1652                    completed.insert(validator);
1653                }
1654                if completed.len() == n {
1655                    break;
1656                }
1657            }
1658
1659            // Cut all links between validator halves
1660            fn separated(n: usize, a: usize, b: usize) -> bool {
1661                let m = n / 2;
1662                (a < m && b >= m) || (a >= m && b < m)
1663            }
1664            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1665
1666            // Wait for any in-progress notarizations/finalizations to finish
1667            context.sleep(Duration::from_secs(10)).await;
1668
1669            // Empty done receiver
1670            loop {
1671                if done_receiver.try_next().is_err() {
1672                    break;
1673                }
1674            }
1675
1676            // Wait for a few virtual minutes (shouldn't finalize anything)
1677            select! {
1678                _timeout = context.sleep(Duration::from_secs(600)) => {},
1679                _done = done_receiver.next() => {
1680                    panic!("engine should not notarize or finalize anything");
1681                }
1682            }
1683
1684            // Restore links
1685            link_validators(
1686                &mut oracle,
1687                &validators,
1688                Action::Link(link),
1689                Some(separated),
1690            )
1691            .await;
1692
1693            // Wait for all engines to finish
1694            let mut completed = HashSet::new();
1695            loop {
1696                let (validator, event) = done_receiver.next().await.unwrap();
1697                if let mocks::application::Progress::Finalized(proof, digest) = event {
1698                    let (view, _, payload, _) =
1699                        prover.deserialize_finalization(proof, 10, true).unwrap();
1700                    if digest != payload {
1701                        panic!(
1702                            "finalization mismatch digest: {:?}, payload: {:?}",
1703                            digest, payload
1704                        );
1705                    }
1706                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1707                        if previous != digest {
1708                            panic!(
1709                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1710                                view, previous, digest
1711                            );
1712                        }
1713                    }
1714                    if (finalized.len() as u64) < required_containers + highest_finalized {
1715                        continue;
1716                    }
1717                    completed.insert(validator);
1718                }
1719                if completed.len() == n {
1720                    break;
1721                }
1722            }
1723
1724            // Check supervisors for correct activity
1725            for supervisor in supervisors.iter() {
1726                // Ensure no faults
1727                {
1728                    let faults = supervisor.faults.lock().unwrap();
1729                    assert!(faults.is_empty());
1730                }
1731            }
1732        });
1733    }
1734
1735    fn slow_and_lossy_links(seed: u64) -> String {
1736        // Create context
1737        let n = 5;
1738        let required_containers = 50;
1739        let activity_timeout = 10;
1740        let namespace = b"consensus".to_vec();
1741        let cfg = deterministic::Config {
1742            seed,
1743            timeout: Some(Duration::from_secs(3_000)),
1744            ..deterministic::Config::default()
1745        };
1746        let (executor, context, auditor) = Executor::init(cfg);
1747        executor.start(async move {
1748            // Create simulated network
1749            let (network, mut oracle) = Network::new(
1750                context.with_label("network"),
1751                Config {
1752                    max_size: 1024 * 1024,
1753                },
1754            );
1755
1756            // Start network
1757            network.start();
1758
1759            // Register participants
1760            let mut schemes = Vec::new();
1761            let mut validators = Vec::new();
1762            for i in 0..n {
1763                let scheme = Ed25519::from_seed(i as u64);
1764                let pk = scheme.public_key();
1765                schemes.push(scheme);
1766                validators.push(pk);
1767            }
1768            validators.sort();
1769            schemes.sort_by_key(|s| s.public_key());
1770            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1771            let mut registrations = register_validators(&mut oracle, &validators).await;
1772
1773            // Link all validators
1774            let degraded_link = Link {
1775                latency: 200.0,
1776                jitter: 150.0,
1777                success_rate: 0.5,
1778            };
1779            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1780
1781            // Create engines
1782            let prover = Prover::new(&namespace);
1783            let relay = Arc::new(mocks::relay::Relay::new());
1784            let mut supervisors = Vec::new();
1785            let (done_sender, mut done_receiver) = mpsc::unbounded();
1786            let mut engine_handlers = Vec::new();
1787            for scheme in schemes.into_iter() {
1788                // Create scheme context
1789                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1790
1791                // Start engine
1792                let validator = scheme.public_key();
1793                let supervisor_config = mocks::supervisor::Config {
1794                    prover: prover.clone(),
1795                    participants: view_validators.clone(),
1796                };
1797                let supervisor =
1798                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1799                supervisors.push(supervisor.clone());
1800                let application_cfg = mocks::application::Config {
1801                    hasher: Sha256::default(),
1802                    relay: relay.clone(),
1803                    participant: validator.clone(),
1804                    tracker: done_sender.clone(),
1805                    propose_latency: (10.0, 5.0),
1806                    verify_latency: (10.0, 5.0),
1807                };
1808                let (actor, application) = mocks::application::Application::new(
1809                    context.with_label("application"),
1810                    application_cfg,
1811                );
1812                actor.start();
1813                let cfg = JConfig {
1814                    partition: validator.to_string(),
1815                };
1816                let journal = Journal::init(context.with_label("journal"), cfg)
1817                    .await
1818                    .expect("unable to create journal");
1819                let cfg = config::Config {
1820                    crypto: scheme,
1821                    automaton: application.clone(),
1822                    relay: application.clone(),
1823                    committer: application,
1824                    supervisor,
1825                    mailbox_size: 1024,
1826                    namespace: namespace.clone(),
1827                    leader_timeout: Duration::from_secs(1),
1828                    notarization_timeout: Duration::from_secs(2),
1829                    nullify_retry: Duration::from_secs(10),
1830                    fetch_timeout: Duration::from_secs(1),
1831                    activity_timeout,
1832                    max_fetch_count: 1,
1833                    max_fetch_size: 1024 * 512,
1834                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1835                    fetch_concurrent: 1,
1836                    replay_concurrency: 1,
1837                };
1838                let (voter, resolver) = registrations
1839                    .remove(&validator)
1840                    .expect("validator should be registered");
1841                let engine = Engine::new(context.with_label("engine"), journal, cfg);
1842                engine_handlers.push(engine.start(voter, resolver));
1843            }
1844
1845            // Wait for all engines to finish
1846            let mut completed = HashSet::new();
1847            let mut finalized = HashMap::new();
1848            loop {
1849                let (validator, event) = done_receiver.next().await.unwrap();
1850                if let mocks::application::Progress::Finalized(proof, digest) = event {
1851                    let (view, _, payload, _) =
1852                        prover.deserialize_finalization(proof, 5, true).unwrap();
1853                    if digest != payload {
1854                        panic!(
1855                            "finalization mismatch digest: {:?}, payload: {:?}",
1856                            digest, payload
1857                        );
1858                    }
1859                    if let Some(previous) = finalized.insert(view, digest.clone()) {
1860                        if previous != digest {
1861                            panic!(
1862                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1863                                view, previous, digest
1864                            );
1865                        }
1866                    }
1867                    if (finalized.len() as u64) < required_containers {
1868                        continue;
1869                    }
1870                    completed.insert(validator);
1871                }
1872                if completed.len() == n as usize {
1873                    break;
1874                }
1875            }
1876
1877            // Check supervisors for correct activity
1878            for supervisor in supervisors.iter() {
1879                // Ensure no faults
1880                {
1881                    let faults = supervisor.faults.lock().unwrap();
1882                    assert!(faults.is_empty());
1883                }
1884            }
1885        });
1886        auditor.state()
1887    }
1888
1889    #[test_traced]
1890    fn test_slow_and_lossy_links() {
1891        slow_and_lossy_links(0);
1892    }
1893
1894    #[test_traced]
1895    fn test_determinism() {
1896        // We use slow and lossy links as the deterministic test
1897        // because it is the most complex test.
1898        for seed in 1..6 {
1899            // Run test with seed
1900            let state_1 = slow_and_lossy_links(seed);
1901
1902            // Run test again with same seed
1903            let state_2 = slow_and_lossy_links(seed);
1904
1905            // Ensure states are equal
1906            assert_eq!(state_1, state_2);
1907        }
1908    }
1909
1910    #[test_traced]
1911    fn test_conflicter() {
1912        // Create context
1913        let n = 4;
1914        let required_containers = 50;
1915        let activity_timeout = 10;
1916        let namespace = b"consensus".to_vec();
1917        let (executor, context, _) = Executor::timed(Duration::from_secs(30));
1918        executor.start(async move {
1919            // Create simulated network
1920            let (network, mut oracle) = Network::new(
1921                context.with_label("network"),
1922                Config {
1923                    max_size: 1024 * 1024,
1924                },
1925            );
1926
1927            // Start network
1928            network.start();
1929
1930            // Register participants
1931            let mut schemes = Vec::new();
1932            let mut validators = Vec::new();
1933            for i in 0..n {
1934                let scheme = Ed25519::from_seed(i as u64);
1935                let pk = scheme.public_key();
1936                schemes.push(scheme);
1937                validators.push(pk);
1938            }
1939            validators.sort();
1940            schemes.sort_by_key(|s| s.public_key());
1941            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1942            let mut registrations = register_validators(&mut oracle, &validators).await;
1943
1944            // Link all validators
1945            let link = Link {
1946                latency: 10.0,
1947                jitter: 1.0,
1948                success_rate: 1.0,
1949            };
1950            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1951
1952            // Create engines
1953            let prover = Prover::new(&namespace);
1954            let relay = Arc::new(mocks::relay::Relay::new());
1955            let mut supervisors = Vec::new();
1956            let (done_sender, mut done_receiver) = mpsc::unbounded();
1957            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1958                // Create scheme context
1959                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1960
1961                // Start engine
1962                let validator = scheme.public_key();
1963                let supervisor_config = mocks::supervisor::Config {
1964                    prover: prover.clone(),
1965                    participants: view_validators.clone(),
1966                };
1967                let supervisor =
1968                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1969                if idx_scheme == 0 {
1970                    let cfg = mocks::conflicter::Config {
1971                        crypto: scheme,
1972                        supervisor,
1973                        namespace: namespace.clone(),
1974                    };
1975                    let (voter, _) = registrations
1976                        .remove(&validator)
1977                        .expect("validator should be registered");
1978                    let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1979                        mocks::conflicter::Conflicter::new(
1980                            context.with_label("byzantine_engine"),
1981                            cfg,
1982                        );
1983                    engine.start(voter);
1984                } else {
1985                    supervisors.push(supervisor.clone());
1986                    let application_cfg = mocks::application::Config {
1987                        hasher: Sha256::default(),
1988                        relay: relay.clone(),
1989                        participant: validator.clone(),
1990                        tracker: done_sender.clone(),
1991                        propose_latency: (10.0, 5.0),
1992                        verify_latency: (10.0, 5.0),
1993                    };
1994                    let (actor, application) = mocks::application::Application::new(
1995                        context.with_label("application"),
1996                        application_cfg,
1997                    );
1998                    actor.start();
1999                    let cfg = JConfig {
2000                        partition: validator.to_string(),
2001                    };
2002                    let journal = Journal::init(context.with_label("journal"), cfg)
2003                        .await
2004                        .expect("unable to create journal");
2005                    let cfg = config::Config {
2006                        crypto: scheme,
2007                        automaton: application.clone(),
2008                        relay: application.clone(),
2009                        committer: application,
2010                        supervisor,
2011                        mailbox_size: 1024,
2012                        namespace: namespace.clone(),
2013                        leader_timeout: Duration::from_secs(1),
2014                        notarization_timeout: Duration::from_secs(2),
2015                        nullify_retry: Duration::from_secs(10),
2016                        fetch_timeout: Duration::from_secs(1),
2017                        activity_timeout,
2018                        max_fetch_count: 1,
2019                        max_fetch_size: 1024 * 512,
2020                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2021                        fetch_concurrent: 1,
2022                        replay_concurrency: 1,
2023                    };
2024                    let (voter, resolver) = registrations
2025                        .remove(&validator)
2026                        .expect("validator should be registered");
2027                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
2028                    engine.start(voter, resolver);
2029                }
2030            }
2031
2032            // Wait for all engines to finish
2033            let mut completed = HashSet::new();
2034            let mut finalized = HashMap::new();
2035            loop {
2036                let (validator, event) = done_receiver.next().await.unwrap();
2037                if let mocks::application::Progress::Finalized(proof, digest) = event {
2038                    let (view, _, payload, _) =
2039                        prover.deserialize_finalization(proof, 5, true).unwrap();
2040                    if digest != payload {
2041                        panic!(
2042                            "finalization mismatch digest: {:?}, payload: {:?}",
2043                            digest, payload
2044                        );
2045                    }
2046                    if let Some(previous) = finalized.insert(view, digest.clone()) {
2047                        if previous != digest {
2048                            panic!(
2049                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2050                                view, previous, digest
2051                            );
2052                        }
2053                    }
2054                    if (finalized.len() as u64) < required_containers {
2055                        continue;
2056                    }
2057                    completed.insert(validator);
2058                }
2059                if completed.len() == (n - 1) as usize {
2060                    break;
2061                }
2062            }
2063
2064            // Check supervisors for correct activity
2065            let byz = &validators[0];
2066            let mut count_conflicting_notarize = 0;
2067            let mut count_conflicting_finalize = 0;
2068            for supervisor in supervisors.iter() {
2069                // Ensure only faults for byz
2070                {
2071                    let faults = supervisor.faults.lock().unwrap();
2072                    assert_eq!(faults.len(), 1);
2073                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2074                    for (_, faults) in faulter.iter() {
2075                        for fault in faults.iter() {
2076                            match *fault {
2077                                CONFLICTING_NOTARIZE => {
2078                                    count_conflicting_notarize += 1;
2079                                }
2080                                CONFLICTING_FINALIZE => {
2081                                    count_conflicting_finalize += 1;
2082                                }
2083                                _ => panic!("unexpected fault: {:?}", fault),
2084                            }
2085                        }
2086                    }
2087                }
2088            }
2089            assert!(count_conflicting_notarize > 0);
2090            assert!(count_conflicting_finalize > 0);
2091        });
2092    }
2093
2094    #[test_traced]
2095    fn test_nuller() {
2096        // Create context
2097        let n = 4;
2098        let required_containers = 50;
2099        let activity_timeout = 10;
2100        let namespace = b"consensus".to_vec();
2101        let (executor, context, _) = Executor::timed(Duration::from_secs(30));
2102        executor.start(async move {
2103            // Create simulated network
2104            let (network, mut oracle) = Network::new(
2105                context.with_label("network"),
2106                Config {
2107                    max_size: 1024 * 1024,
2108                },
2109            );
2110
2111            // Start network
2112            network.start();
2113
2114            // Register participants
2115            let mut schemes = Vec::new();
2116            let mut validators = Vec::new();
2117            for i in 0..n {
2118                let scheme = Ed25519::from_seed(i as u64);
2119                let pk = scheme.public_key();
2120                schemes.push(scheme);
2121                validators.push(pk);
2122            }
2123            validators.sort();
2124            schemes.sort_by_key(|s| s.public_key());
2125            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2126            let mut registrations = register_validators(&mut oracle, &validators).await;
2127
2128            // Link all validators
2129            let link = Link {
2130                latency: 10.0,
2131                jitter: 1.0,
2132                success_rate: 1.0,
2133            };
2134            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2135
2136            // Create engines
2137            let prover = Prover::new(&namespace);
2138            let relay = Arc::new(mocks::relay::Relay::new());
2139            let mut supervisors = Vec::new();
2140            let (done_sender, mut done_receiver) = mpsc::unbounded();
2141            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2142                // Create scheme context
2143                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2144
2145                // Start engine
2146                let validator = scheme.public_key();
2147                let supervisor_config = mocks::supervisor::Config {
2148                    prover: prover.clone(),
2149                    participants: view_validators.clone(),
2150                };
2151                let supervisor =
2152                    mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
2153                if idx_scheme == 0 {
2154                    let cfg = mocks::nuller::Config {
2155                        crypto: scheme,
2156                        supervisor,
2157                        namespace: namespace.clone(),
2158                    };
2159                    let (voter, _) = registrations
2160                        .remove(&validator)
2161                        .expect("validator should be registered");
2162                    let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2163                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2164                    engine.start(voter);
2165                } else {
2166                    supervisors.push(supervisor.clone());
2167                    let application_cfg = mocks::application::Config {
2168                        hasher: Sha256::default(),
2169                        relay: relay.clone(),
2170                        participant: validator.clone(),
2171                        tracker: done_sender.clone(),
2172                        propose_latency: (10.0, 5.0),
2173                        verify_latency: (10.0, 5.0),
2174                    };
2175                    let (actor, application) = mocks::application::Application::new(
2176                        context.with_label("application"),
2177                        application_cfg,
2178                    );
2179                    actor.start();
2180                    let cfg = JConfig {
2181                        partition: validator.to_string(),
2182                    };
2183                    let journal = Journal::init(context.with_label("journal"), cfg)
2184                        .await
2185                        .expect("unable to create journal");
2186                    let cfg = config::Config {
2187                        crypto: scheme,
2188                        automaton: application.clone(),
2189                        relay: application.clone(),
2190                        committer: application,
2191                        supervisor,
2192                        mailbox_size: 1024,
2193                        namespace: namespace.clone(),
2194                        leader_timeout: Duration::from_secs(1),
2195                        notarization_timeout: Duration::from_secs(2),
2196                        nullify_retry: Duration::from_secs(10),
2197                        fetch_timeout: Duration::from_secs(1),
2198                        activity_timeout,
2199                        max_fetch_count: 1,
2200                        max_fetch_size: 1024 * 512,
2201                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2202                        fetch_concurrent: 1,
2203                        replay_concurrency: 1,
2204                    };
2205                    let (voter, resolver) = registrations
2206                        .remove(&validator)
2207                        .expect("validator should be registered");
2208                    let engine = Engine::new(context.with_label("engine"), journal, cfg);
2209                    engine.start(voter, resolver);
2210                }
2211            }
2212
2213            // Wait for all engines to finish
2214            let mut completed = HashSet::new();
2215            let mut finalized = HashMap::new();
2216            loop {
2217                let (validator, event) = done_receiver.next().await.unwrap();
2218                if let mocks::application::Progress::Finalized(proof, digest) = event {
2219                    let (view, _, payload, _) =
2220                        prover.deserialize_finalization(proof, 5, true).unwrap();
2221                    if digest != payload {
2222                        panic!(
2223                            "finalization mismatch digest: {:?}, payload: {:?}",
2224                            digest, payload
2225                        );
2226                    }
2227                    if let Some(previous) = finalized.insert(view, digest.clone()) {
2228                        if previous != digest {
2229                            panic!(
2230                                "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2231                                view, previous, digest
2232                            );
2233                        }
2234                    }
2235                    if (finalized.len() as u64) < required_containers {
2236                        continue;
2237                    }
2238                    completed.insert(validator);
2239                }
2240                if completed.len() == (n - 1) as usize {
2241                    break;
2242                }
2243            }
2244
2245            // Check supervisors for correct activity
2246            let byz = &validators[0];
2247            let mut count_nullify_and_finalize = 0;
2248            for supervisor in supervisors.iter() {
2249                // Ensure only faults for byz
2250                {
2251                    let faults = supervisor.faults.lock().unwrap();
2252                    assert_eq!(faults.len(), 1);
2253                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2254                    for (_, faults) in faulter.iter() {
2255                        for fault in faults.iter() {
2256                            match *fault {
2257                                NULLIFY_AND_FINALIZE => {
2258                                    count_nullify_and_finalize += 1;
2259                                }
2260                                _ => panic!("unexpected fault: {:?}", fault),
2261                            }
2262                        }
2263                    }
2264                }
2265            }
2266            assert!(count_nullify_and_finalize > 0);
2267        });
2268    }
2269}