commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal
5//! finalization latency in a partially synchronous setting.
6//!
7//! _If your application would benefit from succinct consensus certificates or a bias-resistant
8//! VRF, see [crate::threshold_simplex]._
9//!
10//! # Features
11//!
12//! * Wicked Fast Block Times (2 Network Hops)
13//! * Optimal Finalization Latency (3 Network Hops)
14//! * Externalized Uptime and Fault Proofs
15//! * Decoupled Block Broadcast and Sync
16//! * Flexible Block Format
17//!
18//! # Design
19//!
20//! ## Architecture
21//!
22//! All logic is split into three components: the `Voter`, the `Resolver`, and the `Application` (provided by the user).
23//! The `Voter` is responsible for participating in the latest view and the `Resolver` is responsible for fetching artifacts
24//! from previous views required to verify proposed blocks in the latest view.
25//!
26//! To provide great performance, all interactions between `Voter`, `Resolver`, and `Application` are
27//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
28//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
29//!
30//! ```txt
31//! +---------------+           +---------+            +++++++++++++++
32//! |               |<----------+         +----------->+             +
33//! |  Application  |           |  Voter  |            +    Peers    +
34//! |               +---------->|         |<-----------+             +
35//! +---------------+           +--+------+            +++++++++++++++
36//!                                |   ^
37//!                                |   |
38//!                                |   |
39//!                                |   |
40//!                                v   |
41//!                            +-------+----+          +++++++++++++++
42//!                            |            +--------->+             +
43//!                            |  Resolver  |          +    Peers    +
44//!                            |            |<---------+             +
45//!                            +------------+          +++++++++++++++
46//! ```
47//!
48//! _Application is usually a single object that implements the `Automaton`, `Relay`, `Committer`,
49//! and `Supervisor` traits._
50//!
51//! ## Joining Consensus
52//!
53//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter`
54//! will enter `v+1`. This means that a new participant joining consensus will immediately jump
55//! ahead to the latest view and begin participating in consensus (assuming it can verify blocks).
56//!
57//! ## Persistence
58//!
59//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
60//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
61//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
62//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
63//! on restart (especially in the case of unclean shutdown).
64//!
65//! ## Protocol Description
66//!
67//! ### Specification for View `v`
68//!
69//! Upon entering view `v`:
70//! * Determine leader `l` for view `v`
71//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
72//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
73//! * If leader `l`, broadcast `notarize(c,v)`
74//!   * If can't propose container in view `v` because missing notarization/nullification for a
75//!     previous view `v_m`, request `v_m`
76//!
77//! Upon receiving first `notarize(c,v)` from `l`:
78//! * Cancel `t_l`
79//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
80//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
81//!
82//! Upon receiving `2f+1` `notarize(c,v)`:
83//! * Cancel `t_a`
84//! * Mark `c` as notarized
85//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
86//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
87//! * Enter `v+1`
88//!
89//! Upon receiving `2f+1` `nullify(v)`:
90//! * Broadcast `nullification(v)`
91//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
92//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
93//!       instead.
94//! * Enter `v+1`
95//!
96//! Upon receiving `2f+1` `finalize(c,v)`:
97//! * Mark `c` as finalized (and recursively finalize its parents)
98//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
99//!
100//! Upon `t_l` or `t_a` firing:
101//! * Broadcast `nullify(v)`
102//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
103//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
104//!
105//! ### Deviations from Simplex Consensus
106//!
107//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
108//!   a set of all notarizations/nullifications for all historical blocks.
109//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
110//!   either a "block" or a "dummy block", respectively.
111//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
112//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
113//!   some number of views (again to trigger early view transition for an unresponsive leader).
114//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
115//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
116
117pub mod types;
118
119cfg_if::cfg_if! {
120    if #[cfg(not(target_arch = "wasm32"))] {
121        mod actors;
122        mod config;
123        pub use config::Config;
124        mod engine;
125        pub use engine::Engine;
126        mod metrics;
127    }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::Monitor;
137    use commonware_cryptography::{
138        ed25519::{PrivateKey, PublicKey},
139        sha256::Digest as Sha256Digest,
140        PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141    };
142    use commonware_macros::{select, test_traced};
143    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
145    use commonware_utils::{quorum, NZU32};
146    use engine::Engine;
147    use futures::{future::join_all, StreamExt};
148    use governor::Quota;
149    use rand::Rng as _;
150    use std::{
151        collections::{BTreeMap, HashMap},
152        sync::{Arc, Mutex},
153        time::Duration,
154    };
155    use tracing::debug;
156    use types::Activity;
157
158    /// Registers all validators using the oracle.
159    async fn register_validators<P: CPublicKey>(
160        oracle: &mut Oracle<P>,
161        validators: &[P],
162    ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
163        let mut registrations = HashMap::new();
164        for validator in validators.iter() {
165            let (voter_sender, voter_receiver) =
166                oracle.register(validator.clone(), 0).await.unwrap();
167            let (resolver_sender, resolver_receiver) =
168                oracle.register(validator.clone(), 1).await.unwrap();
169            registrations.insert(
170                validator.clone(),
171                (
172                    (voter_sender, voter_receiver),
173                    (resolver_sender, resolver_receiver),
174                ),
175            );
176        }
177        registrations
178    }
179
180    /// Enum to describe the action to take when linking validators.
181    enum Action {
182        Link(Link),
183        Update(Link), // Unlink and then link
184        Unlink,
185    }
186
187    /// Links (or unlinks) validators using the oracle.
188    ///
189    /// The `action` parameter determines the action (e.g. link, unlink) to take.
190    /// The `restrict_to` function can be used to restrict the linking to certain connections,
191    /// otherwise all validators will be linked to all other validators.
192    async fn link_validators<P: CPublicKey>(
193        oracle: &mut Oracle<P>,
194        validators: &[P],
195        action: Action,
196        restrict_to: Option<fn(usize, usize, usize) -> bool>,
197    ) {
198        for (i1, v1) in validators.iter().enumerate() {
199            for (i2, v2) in validators.iter().enumerate() {
200                // Ignore self
201                if v2 == v1 {
202                    continue;
203                }
204
205                // Restrict to certain connections
206                if let Some(f) = restrict_to {
207                    if !f(validators.len(), i1, i2) {
208                        continue;
209                    }
210                }
211
212                // Do any unlinking first
213                match action {
214                    Action::Update(_) | Action::Unlink => {
215                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
216                    }
217                    _ => {}
218                }
219
220                // Do any linking after
221                match action {
222                    Action::Link(ref link) | Action::Update(ref link) => {
223                        oracle
224                            .add_link(v1.clone(), v2.clone(), link.clone())
225                            .await
226                            .unwrap();
227                    }
228                    _ => {}
229                }
230            }
231        }
232    }
233
234    #[test_traced]
235    fn test_all_online() {
236        // Create context
237        let n = 5;
238        let threshold = quorum(n);
239        let max_exceptions = 10;
240        let required_containers = 100;
241        let activity_timeout = 10;
242        let skip_timeout = 5;
243        let namespace = b"consensus".to_vec();
244        let executor = deterministic::Runner::timed(Duration::from_secs(30));
245        executor.start(|context| async move {
246            // Create simulated network
247            let (network, mut oracle) = Network::new(
248                context.with_label("network"),
249                Config {
250                    max_size: 1024 * 1024,
251                },
252            );
253
254            // Start network
255            network.start();
256
257            // Register participants
258            let mut schemes = Vec::new();
259            let mut validators = Vec::new();
260            for i in 0..n {
261                let scheme = PrivateKey::from_seed(i as u64);
262                let pk = scheme.public_key();
263                schemes.push(scheme);
264                validators.push(pk);
265            }
266            validators.sort();
267            schemes.sort_by_key(|s| s.public_key());
268            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
269            let mut registrations = register_validators(&mut oracle, &validators).await;
270
271            // Link all validators
272            let link = Link {
273                latency: 10.0,
274                jitter: 1.0,
275                success_rate: 1.0,
276            };
277            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
278
279            // Create engines
280            let relay = Arc::new(mocks::relay::Relay::new());
281            let mut supervisors = Vec::new();
282            let mut engine_handlers = Vec::new();
283            for scheme in schemes.into_iter() {
284                // Create scheme context
285                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
286
287                // Start engine
288                let validator = scheme.public_key();
289                let supervisor_config = mocks::supervisor::Config {
290                    namespace: namespace.clone(),
291                    participants: view_validators.clone(),
292                };
293                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
294                    supervisor_config,
295                );
296                supervisors.push(supervisor.clone());
297                let application_cfg = mocks::application::Config {
298                    hasher: Sha256::default(),
299                    relay: relay.clone(),
300                    participant: validator.clone(),
301                    propose_latency: (10.0, 5.0),
302                    verify_latency: (10.0, 5.0),
303                };
304                let (actor, application) = mocks::application::Application::new(
305                    context.with_label("application"),
306                    application_cfg,
307                );
308                actor.start();
309                let cfg = config::Config {
310                    crypto: scheme,
311                    automaton: application.clone(),
312                    relay: application.clone(),
313                    reporter: supervisor.clone(),
314                    supervisor,
315                    partition: validator.to_string(),
316                    compression: Some(3),
317                    mailbox_size: 1024,
318                    namespace: namespace.clone(),
319                    leader_timeout: Duration::from_secs(1),
320                    notarization_timeout: Duration::from_secs(2),
321                    nullify_retry: Duration::from_secs(10),
322                    fetch_timeout: Duration::from_secs(1),
323                    activity_timeout,
324                    skip_timeout,
325                    max_fetch_count: 1,
326                    max_participants: n as usize,
327                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
328                    fetch_concurrent: 1,
329                    replay_buffer: 1024 * 1024,
330                    write_buffer: 1024 * 1024,
331                };
332                let engine = Engine::new(context.with_label("engine"), cfg);
333                let (voter, resolver) = registrations
334                    .remove(&validator)
335                    .expect("validator should be registered");
336                engine_handlers.push(engine.start(voter, resolver));
337            }
338
339            // Wait for all engines to finish
340            let mut finalizers = Vec::new();
341            for supervisor in supervisors.iter_mut() {
342                let (mut latest, mut monitor) = supervisor.subscribe().await;
343                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
344                    while latest < required_containers {
345                        latest = monitor.next().await.expect("event missing");
346                    }
347                }));
348            }
349            join_all(finalizers).await;
350
351            // Check supervisors for correct activity
352            let latest_complete = required_containers - activity_timeout;
353            for supervisor in supervisors.iter() {
354                // Ensure no faults
355                {
356                    let faults = supervisor.faults.lock().unwrap();
357                    assert!(faults.is_empty());
358                }
359
360                // Ensure no forks
361                let mut exceptions = 0;
362                let mut notarized = HashMap::new();
363                let mut finalized = HashMap::new();
364                {
365                    let notarizes = supervisor.notarizes.lock().unwrap();
366                    for view in 1..latest_complete {
367                        // Ensure only one payload proposed per view
368                        let Some(payloads) = notarizes.get(&view) else {
369                            exceptions += 1;
370                            continue;
371                        };
372                        if payloads.len() > 1 {
373                            panic!("view: {view}");
374                        }
375                        let (digest, notarizers) = payloads.iter().next().unwrap();
376                        notarized.insert(view, *digest);
377
378                        if notarizers.len() < threshold as usize {
379                            // We can't verify that everyone participated at every view because some nodes may
380                            // have started later.
381                            panic!("view: {view}");
382                        }
383                        if notarizers.len() != n as usize {
384                            exceptions += 1;
385                        }
386                    }
387                }
388                {
389                    let notarizations = supervisor.notarizations.lock().unwrap();
390                    for view in 1..latest_complete {
391                        // Ensure notarization matches digest from notarizes
392                        let Some(notarization) = notarizations.get(&view) else {
393                            exceptions += 1;
394                            continue;
395                        };
396                        let Some(digest) = notarized.get(&view) else {
397                            exceptions += 1;
398                            continue;
399                        };
400                        assert_eq!(&notarization.proposal.payload, digest);
401                    }
402                }
403                {
404                    let finalizes = supervisor.finalizes.lock().unwrap();
405                    for view in 1..latest_complete {
406                        // Ensure only one payload proposed per view
407                        let Some(payloads) = finalizes.get(&view) else {
408                            exceptions += 1;
409                            continue;
410                        };
411                        if payloads.len() > 1 {
412                            panic!("view: {view}");
413                        }
414                        let (digest, finalizers) = payloads.iter().next().unwrap();
415                        finalized.insert(view, *digest);
416
417                        // Only check at views below timeout
418                        if view > latest_complete {
419                            continue;
420                        }
421
422                        // Ensure everyone participating
423                        if finalizers.len() < threshold as usize {
424                            // We can't verify that everyone participated at every view because some nodes may
425                            // have started later.
426                            panic!("view: {view}");
427                        }
428                        if finalizers.len() != n as usize {
429                            exceptions += 1;
430                        }
431
432                        // Ensure no nullifies for any finalizers
433                        let nullifies = supervisor.nullifies.lock().unwrap();
434                        let Some(nullifies) = nullifies.get(&view) else {
435                            continue;
436                        };
437                        for (_, finalizers) in payloads.iter() {
438                            for finalizer in finalizers.iter() {
439                                if nullifies.contains(finalizer) {
440                                    panic!("should not nullify and finalize at same view");
441                                }
442                            }
443                        }
444                    }
445                }
446                {
447                    let finalizations = supervisor.finalizations.lock().unwrap();
448                    for view in 1..latest_complete {
449                        // Ensure finalization matches digest from finalizes
450                        let Some(finalization) = finalizations.get(&view) else {
451                            exceptions += 1;
452                            continue;
453                        };
454                        let Some(digest) = finalized.get(&view) else {
455                            exceptions += 1;
456                            continue;
457                        };
458                        assert_eq!(&finalization.proposal.payload, digest);
459                    }
460                }
461
462                // Ensure exceptions within allowed
463                assert!(exceptions <= max_exceptions);
464            }
465        });
466    }
467
468    #[test_traced]
469    fn test_unclean_shutdown() {
470        // Create context
471        let n = 5;
472        let required_containers = 100;
473        let activity_timeout = 10;
474        let skip_timeout = 5;
475        let namespace = b"consensus".to_vec();
476
477        // Random restarts every x seconds
478        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
479        let supervised = Arc::new(Mutex::new(Vec::new()));
480        let mut prev_ctx = None;
481
482        loop {
483            let namespace = namespace.clone();
484            let shutdowns = shutdowns.clone();
485            let supervised = supervised.clone();
486
487            let f = |mut context: deterministic::Context| async move {
488                // Create simulated network
489                let (network, mut oracle) = Network::new(
490                    context.with_label("network"),
491                    Config {
492                        max_size: 1024 * 1024,
493                    },
494                );
495
496                // Start network
497                network.start();
498
499                // Register participants
500                let mut schemes = Vec::new();
501                let mut validators = Vec::new();
502                for i in 0..n {
503                    let scheme = PrivateKey::from_seed(i as u64);
504                    let pk = scheme.public_key();
505                    schemes.push(scheme);
506                    validators.push(pk);
507                }
508                validators.sort();
509                schemes.sort_by_key(|s| s.public_key());
510                let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
511                let mut registrations = register_validators(&mut oracle, &validators).await;
512
513                // Link all validators
514                let link = Link {
515                    latency: 50.0,
516                    jitter: 50.0,
517                    success_rate: 1.0,
518                };
519                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
520
521                // Create engines
522                let relay = Arc::new(mocks::relay::Relay::new());
523                let mut supervisors = HashMap::new();
524                let mut engine_handlers = Vec::new();
525                for scheme in schemes.into_iter() {
526                    // Create scheme context
527                    let context = context
528                        .clone()
529                        .with_label(&format!("validator-{}", scheme.public_key()));
530
531                    // Start engine
532                    let validator = scheme.public_key();
533                    let supervisor_config = mocks::supervisor::Config {
534                        namespace: namespace.clone(),
535                        participants: view_validators.clone(),
536                    };
537                    let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
538                        supervisor_config,
539                    );
540                    supervisors.insert(validator.clone(), supervisor.clone());
541                    let application_cfg = mocks::application::Config {
542                        hasher: Sha256::default(),
543                        relay: relay.clone(),
544                        participant: validator.clone(),
545                        propose_latency: (10.0, 5.0),
546                        verify_latency: (10.0, 5.0),
547                    };
548                    let (actor, application) = mocks::application::Application::new(
549                        context.with_label("application"),
550                        application_cfg,
551                    );
552                    actor.start();
553                    let cfg = config::Config {
554                        crypto: scheme,
555                        automaton: application.clone(),
556                        relay: application.clone(),
557                        reporter: supervisor.clone(),
558                        supervisor,
559                        partition: validator.to_string(),
560                        compression: Some(3),
561                        mailbox_size: 1024,
562                        namespace: namespace.clone(),
563                        leader_timeout: Duration::from_secs(1),
564                        notarization_timeout: Duration::from_secs(2),
565                        nullify_retry: Duration::from_secs(10),
566                        fetch_timeout: Duration::from_secs(1),
567                        activity_timeout,
568                        skip_timeout,
569                        max_participants: n as usize,
570                        max_fetch_count: 1,
571                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
572                        fetch_concurrent: 1,
573                        replay_buffer: 1024 * 1024,
574                        write_buffer: 1024 * 1024,
575                    };
576                    let engine = Engine::new(context.with_label("engine"), cfg);
577                    let (voter_network, resolver_network) = registrations
578                        .remove(&validator)
579                        .expect("validator should be registered");
580                    engine_handlers.push(engine.start(voter_network, resolver_network));
581                }
582
583                // Store all finalizer handles
584                let mut finalizers = Vec::new();
585                for (_, supervisor) in supervisors.iter_mut() {
586                    let (mut latest, mut monitor) = supervisor.subscribe().await;
587                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
588                        while latest < required_containers {
589                            latest = monitor.next().await.expect("event missing");
590                        }
591                    }));
592                }
593
594                // Exit at random points for unclean shutdown of entire set
595                let wait =
596                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
597                select! {
598                    _ = context.sleep(wait) => {
599                        // Collect supervisors to check faults
600                        {
601                            let mut shutdowns = shutdowns.lock().unwrap();
602                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
603                            *shutdowns += 1;
604                        }
605                        supervised.lock().unwrap().push(supervisors);
606                        (false,context)
607                    },
608                    _ = join_all(finalizers) => {
609                        // Check supervisors for faults activity
610                        let supervised = supervised.lock().unwrap();
611                        for supervisors in supervised.iter() {
612                            for (_, supervisor) in supervisors.iter() {
613                                let faults = supervisor.faults.lock().unwrap();
614                                assert!(faults.is_empty());
615                            }
616                        }
617                        (true,context)
618                    }
619                }
620            };
621
622            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
623                deterministic::Runner::from(prev_ctx)
624            } else {
625                deterministic::Runner::timed(Duration::from_secs(30))
626            }
627            .start(f);
628
629            // If we are done, break
630            if complete {
631                break;
632            }
633
634            prev_ctx = Some(context.recover());
635        }
636    }
637
638    #[test_traced]
639    fn test_backfill() {
640        // Create context
641        let n = 4;
642        let required_containers = 100;
643        let activity_timeout = 10;
644        let skip_timeout = 5;
645        let namespace = b"consensus".to_vec();
646        let executor = deterministic::Runner::timed(Duration::from_secs(360));
647        executor.start(|context| async move {
648            // Create simulated network
649            let (network, mut oracle) = Network::new(
650                context.with_label("network"),
651                Config {
652                    max_size: 1024 * 1024,
653                },
654            );
655
656            // Start network
657            network.start();
658
659            // Register participants
660            let mut schemes = Vec::new();
661            let mut validators = Vec::new();
662            for i in 0..n {
663                let scheme = PrivateKey::from_seed(i as u64);
664                let pk = scheme.public_key();
665                schemes.push(scheme);
666                validators.push(pk);
667            }
668            validators.sort();
669            schemes.sort_by_key(|s| s.public_key());
670            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
671            let mut registrations = register_validators(&mut oracle, &validators).await;
672
673            // Link all validators except first
674            let link = Link {
675                latency: 10.0,
676                jitter: 1.0,
677                success_rate: 1.0,
678            };
679            link_validators(
680                &mut oracle,
681                &validators,
682                Action::Link(link),
683                Some(|_, i, j| ![i, j].contains(&0usize)),
684            )
685            .await;
686
687            // Create engines
688            let relay = Arc::new(mocks::relay::Relay::new());
689            let mut supervisors = Vec::new();
690            let mut engine_handlers = Vec::new();
691            for (idx_scheme, scheme) in schemes.iter().enumerate() {
692                // Skip first peer
693                if idx_scheme == 0 {
694                    continue;
695                }
696
697                // Create scheme context
698                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
699
700                // Start engine
701                let validator = scheme.public_key();
702                let supervisor_config = mocks::supervisor::Config {
703                    namespace: namespace.clone(),
704                    participants: view_validators.clone(),
705                };
706                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
707                    supervisor_config,
708                );
709                supervisors.push(supervisor.clone());
710                let application_cfg = mocks::application::Config {
711                    hasher: Sha256::default(),
712                    relay: relay.clone(),
713                    participant: validator.clone(),
714                    propose_latency: (10.0, 5.0),
715                    verify_latency: (10.0, 5.0),
716                };
717                let (actor, application) = mocks::application::Application::new(
718                    context.with_label("application"),
719                    application_cfg,
720                );
721                actor.start();
722                let cfg = config::Config {
723                    crypto: scheme.clone(),
724                    automaton: application.clone(),
725                    relay: application.clone(),
726                    reporter: supervisor.clone(),
727                    supervisor,
728                    partition: validator.to_string(),
729                    compression: Some(3),
730                    mailbox_size: 1024,
731                    namespace: namespace.clone(),
732                    leader_timeout: Duration::from_secs(1),
733                    notarization_timeout: Duration::from_secs(2),
734                    nullify_retry: Duration::from_secs(10),
735                    fetch_timeout: Duration::from_secs(1),
736                    activity_timeout,
737                    skip_timeout,
738                    max_fetch_count: 1, // force many fetches
739                    max_participants: n as usize,
740                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
741                    fetch_concurrent: 1,
742                    replay_buffer: 1024 * 1024,
743                    write_buffer: 1024 * 1024,
744                };
745                let (voter, resolver) = registrations
746                    .remove(&validator)
747                    .expect("validator should be registered");
748                let engine = Engine::new(context.with_label("engine"), cfg);
749                engine_handlers.push(engine.start(voter, resolver));
750            }
751
752            // Wait for all engines to finish
753            let mut finalizers = Vec::new();
754            for supervisor in supervisors.iter_mut() {
755                let (mut latest, mut monitor) = supervisor.subscribe().await;
756                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
757                    while latest < required_containers {
758                        latest = monitor.next().await.expect("event missing");
759                    }
760                }));
761            }
762            join_all(finalizers).await;
763
764            // Degrade network connections for online peers
765            let link = Link {
766                latency: 3_000.0,
767                jitter: 0.0,
768                success_rate: 1.0,
769            };
770            link_validators(
771                &mut oracle,
772                &validators,
773                Action::Update(link.clone()),
774                Some(|_, i, j| ![i, j].contains(&0usize)),
775            )
776            .await;
777
778            // Wait for nullifications to accrue
779            context.sleep(Duration::from_secs(120)).await;
780
781            // Unlink second peer from all (except first)
782            link_validators(
783                &mut oracle,
784                &validators,
785                Action::Unlink,
786                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
787            )
788            .await;
789
790            // Start engine for first peer
791            let scheme = schemes[0].clone();
792            let validator = scheme.public_key();
793
794            // Create scheme context
795            let context = context.with_label(&format!("validator-{}", scheme.public_key()));
796
797            // Link first peer to all (except second)
798            link_validators(
799                &mut oracle,
800                &validators,
801                Action::Link(link),
802                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
803            )
804            .await;
805
806            // Restore network connections for all online peers
807            let link = Link {
808                latency: 10.0,
809                jitter: 2.5,
810                success_rate: 1.0,
811            };
812            link_validators(
813                &mut oracle,
814                &validators,
815                Action::Update(link),
816                Some(|_, i, j| ![i, j].contains(&1usize)),
817            )
818            .await;
819
820            // Start engine
821            let supervisor_config = mocks::supervisor::Config {
822                namespace: namespace.clone(),
823                participants: view_validators.clone(),
824            };
825            let mut supervisor =
826                mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
827            supervisors.push(supervisor.clone());
828            let application_cfg = mocks::application::Config {
829                hasher: Sha256::default(),
830                relay: relay.clone(),
831                participant: validator.clone(),
832                propose_latency: (10.0, 5.0),
833                verify_latency: (10.0, 5.0),
834            };
835            let (actor, application) = mocks::application::Application::new(
836                context.with_label("application"),
837                application_cfg,
838            );
839            actor.start();
840            let cfg = config::Config {
841                crypto: scheme,
842                automaton: application.clone(),
843                relay: application.clone(),
844                reporter: supervisor.clone(),
845                supervisor: supervisor.clone(),
846                partition: validator.to_string(),
847                compression: Some(3),
848                mailbox_size: 1024,
849                namespace: namespace.clone(),
850                leader_timeout: Duration::from_secs(1),
851                notarization_timeout: Duration::from_secs(2),
852                nullify_retry: Duration::from_secs(10),
853                fetch_timeout: Duration::from_secs(1),
854                activity_timeout,
855                skip_timeout,
856                max_fetch_count: 1,
857                max_participants: n as usize,
858                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
859                fetch_concurrent: 1,
860                replay_buffer: 1024 * 1024,
861                write_buffer: 1024 * 1024,
862            };
863            let (voter, resolver) = registrations
864                .remove(&validator)
865                .expect("validator should be registered");
866            let engine = Engine::new(context.with_label("engine"), cfg);
867            engine_handlers.push(engine.start(voter, resolver));
868
869            // Wait for new engine to finalize required
870            let (mut latest, mut monitor) = supervisor.subscribe().await;
871            while latest < required_containers {
872                latest = monitor.next().await.expect("event missing");
873            }
874        });
875    }
876
877    #[test_traced]
878    fn test_one_offline() {
879        // Create context
880        let n = 5;
881        let threshold = quorum(n);
882        let required_containers = 100;
883        let activity_timeout = 10;
884        let skip_timeout = 5;
885        let namespace = b"consensus".to_vec();
886        let executor = deterministic::Runner::timed(Duration::from_secs(30));
887        executor.start(|context| async move {
888            // Create simulated network
889            let (network, mut oracle) = Network::new(
890                context.with_label("network"),
891                Config {
892                    max_size: 1024 * 1024,
893                },
894            );
895
896            // Start network
897            network.start();
898
899            // Register participants
900            let mut schemes = Vec::new();
901            let mut validators = Vec::new();
902            for i in 0..n {
903                let scheme = PrivateKey::from_seed(i as u64);
904                let pk = scheme.public_key();
905                schemes.push(scheme);
906                validators.push(pk);
907            }
908            validators.sort();
909            schemes.sort_by_key(|s| s.public_key());
910            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
911            let mut registrations = register_validators(&mut oracle, &validators).await;
912
913            // Link all validators except first
914            let link = Link {
915                latency: 10.0,
916                jitter: 1.0,
917                success_rate: 1.0,
918            };
919            link_validators(
920                &mut oracle,
921                &validators,
922                Action::Link(link),
923                Some(|_, i, j| ![i, j].contains(&0usize)),
924            )
925            .await;
926
927            // Create engines
928            let relay = Arc::new(mocks::relay::Relay::new());
929            let mut supervisors = Vec::new();
930            let mut engine_handlers = Vec::new();
931            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
932                // Skip first peer
933                if idx_scheme == 0 {
934                    continue;
935                }
936
937                // Create scheme context
938                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
939
940                // Start engine
941                let validator = scheme.public_key();
942                let supervisor_config = mocks::supervisor::Config {
943                    namespace: namespace.clone(),
944                    participants: view_validators.clone(),
945                };
946                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
947                    supervisor_config,
948                );
949                supervisors.push(supervisor.clone());
950                let application_cfg = mocks::application::Config {
951                    hasher: Sha256::default(),
952                    relay: relay.clone(),
953                    participant: validator.clone(),
954                    propose_latency: (10.0, 5.0),
955                    verify_latency: (10.0, 5.0),
956                };
957                let (actor, application) = mocks::application::Application::new(
958                    context.with_label("application"),
959                    application_cfg,
960                );
961                actor.start();
962                let cfg = config::Config {
963                    crypto: scheme,
964                    automaton: application.clone(),
965                    relay: application.clone(),
966                    reporter: supervisor.clone(),
967                    partition: validator.to_string(),
968                    compression: Some(3),
969                    supervisor,
970                    mailbox_size: 1024,
971                    namespace: namespace.clone(),
972                    leader_timeout: Duration::from_secs(1),
973                    notarization_timeout: Duration::from_secs(2),
974                    nullify_retry: Duration::from_secs(10),
975                    fetch_timeout: Duration::from_secs(1),
976                    activity_timeout,
977                    skip_timeout,
978                    max_participants: n as usize,
979                    max_fetch_count: 1,
980                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
981                    fetch_concurrent: 1,
982                    replay_buffer: 1024 * 1024,
983                    write_buffer: 1024 * 1024,
984                };
985                let (voter, resolver) = registrations
986                    .remove(&validator)
987                    .expect("validator should be registered");
988                let engine = Engine::new(context.with_label("engine"), cfg);
989                engine_handlers.push(engine.start(voter, resolver));
990            }
991
992            // Wait for all engines to finish
993            let mut finalizers = Vec::new();
994            for supervisor in supervisors.iter_mut() {
995                let (mut latest, mut monitor) = supervisor.subscribe().await;
996                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
997                    while latest < required_containers {
998                        latest = monitor.next().await.expect("event missing");
999                    }
1000                }));
1001            }
1002            join_all(finalizers).await;
1003
1004            // Check supervisors for correct activity
1005            let offline = &validators[0];
1006            for supervisor in supervisors.iter() {
1007                // Ensure no faults
1008                {
1009                    let faults = supervisor.faults.lock().unwrap();
1010                    assert!(faults.is_empty());
1011                }
1012
1013                // Ensure offline node is never active
1014                {
1015                    let notarizes = supervisor.notarizes.lock().unwrap();
1016                    for (view, payloads) in notarizes.iter() {
1017                        for (_, participants) in payloads.iter() {
1018                            if participants.contains(offline) {
1019                                panic!("view: {view}");
1020                            }
1021                        }
1022                    }
1023                }
1024                {
1025                    let nullifies = supervisor.nullifies.lock().unwrap();
1026                    for (view, participants) in nullifies.iter() {
1027                        if participants.contains(offline) {
1028                            panic!("view: {view}");
1029                        }
1030                    }
1031                }
1032                {
1033                    let finalizes = supervisor.finalizes.lock().unwrap();
1034                    for (view, payloads) in finalizes.iter() {
1035                        for (_, finalizers) in payloads.iter() {
1036                            if finalizers.contains(offline) {
1037                                panic!("view: {view}");
1038                            }
1039                        }
1040                    }
1041                }
1042
1043                // Identify offline views
1044                let mut offline_views = Vec::new();
1045                {
1046                    let leaders = supervisor.leaders.lock().unwrap();
1047                    for (view, leader) in leaders.iter() {
1048                        if leader == offline {
1049                            offline_views.push(*view);
1050                        }
1051                    }
1052                }
1053                assert!(!offline_views.is_empty());
1054
1055                // Ensure nullifies/nullification collected for offline node
1056                {
1057                    let nullifies = supervisor.nullifies.lock().unwrap();
1058                    for view in offline_views.iter() {
1059                        let nullifies = nullifies.get(view).unwrap();
1060                        if nullifies.len() < threshold as usize {
1061                            panic!("view: {view}");
1062                        }
1063                    }
1064                }
1065                {
1066                    let nullifications = supervisor.nullifications.lock().unwrap();
1067                    for view in offline_views.iter() {
1068                        nullifications.get(view).unwrap();
1069                    }
1070                }
1071            }
1072
1073            // Ensure we are skipping views
1074            let encoded = context.encode();
1075            let lines = encoded.lines();
1076            let mut skipped_views = 0;
1077            let mut nodes_skipping = 0;
1078            for line in lines {
1079                if line.contains("_engine_voter_skipped_views_total") {
1080                    let parts: Vec<&str> = line.split_whitespace().collect();
1081                    if let Some(number_str) = parts.last() {
1082                        if let Ok(number) = number_str.parse::<u64>() {
1083                            if number > 0 {
1084                                nodes_skipping += 1;
1085                            }
1086                            if number > skipped_views {
1087                                skipped_views = number;
1088                            }
1089                        }
1090                    }
1091                }
1092            }
1093            assert!(
1094                skipped_views > 0,
1095                "expected skipped views to be greater than 0"
1096            );
1097            assert_eq!(
1098                nodes_skipping,
1099                n - 1,
1100                "expected all online nodes to be skipping views"
1101            );
1102        });
1103    }
1104
1105    #[test_traced]
1106    fn test_slow_validator() {
1107        // Create context
1108        let n = 5;
1109        let required_containers = 50;
1110        let activity_timeout = 10;
1111        let skip_timeout = 5;
1112        let namespace = b"consensus".to_vec();
1113        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1114        executor.start(|context| async move {
1115            // Create simulated network
1116            let (network, mut oracle) = Network::new(
1117                context.with_label("network"),
1118                Config {
1119                    max_size: 1024 * 1024,
1120                },
1121            );
1122
1123            // Start network
1124            network.start();
1125
1126            // Register participants
1127            let mut schemes = Vec::new();
1128            let mut validators = Vec::new();
1129            for i in 0..n {
1130                let scheme = PrivateKey::from_seed(i as u64);
1131                let pk = scheme.public_key();
1132                schemes.push(scheme);
1133                validators.push(pk);
1134            }
1135            validators.sort();
1136            schemes.sort_by_key(|s| s.public_key());
1137            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1138            let mut registrations = register_validators(&mut oracle, &validators).await;
1139
1140            // Link all validators
1141            let link = Link {
1142                latency: 10.0,
1143                jitter: 1.0,
1144                success_rate: 1.0,
1145            };
1146            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1147
1148            // Create engines
1149            let relay = Arc::new(mocks::relay::Relay::new());
1150            let mut supervisors = Vec::new();
1151            let mut engine_handlers = Vec::new();
1152            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1153                // Create scheme context
1154                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1155
1156                // Start engine
1157                let validator = scheme.public_key();
1158                let supervisor_config = mocks::supervisor::Config {
1159                    namespace: namespace.clone(),
1160                    participants: view_validators.clone(),
1161                };
1162                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1163                    supervisor_config,
1164                );
1165                supervisors.push(supervisor.clone());
1166                let application_cfg = if idx_scheme == 0 {
1167                    mocks::application::Config {
1168                        hasher: Sha256::default(),
1169                        relay: relay.clone(),
1170                        participant: validator.clone(),
1171                        propose_latency: (3_000.0, 0.0),
1172                        verify_latency: (3_000.0, 5.0),
1173                    }
1174                } else {
1175                    mocks::application::Config {
1176                        hasher: Sha256::default(),
1177                        relay: relay.clone(),
1178                        participant: validator.clone(),
1179                        propose_latency: (10.0, 5.0),
1180                        verify_latency: (10.0, 5.0),
1181                    }
1182                };
1183                let (actor, application) = mocks::application::Application::new(
1184                    context.with_label("application"),
1185                    application_cfg,
1186                );
1187                actor.start();
1188                let cfg = config::Config {
1189                    crypto: scheme,
1190                    automaton: application.clone(),
1191                    relay: application.clone(),
1192                    reporter: supervisor.clone(),
1193                    partition: validator.to_string(),
1194                    compression: Some(3),
1195                    supervisor,
1196                    mailbox_size: 1024,
1197                    namespace: namespace.clone(),
1198                    leader_timeout: Duration::from_secs(1),
1199                    notarization_timeout: Duration::from_secs(2),
1200                    nullify_retry: Duration::from_secs(10),
1201                    fetch_timeout: Duration::from_secs(1),
1202                    activity_timeout,
1203                    skip_timeout,
1204                    max_fetch_count: 1,
1205                    max_participants: n as usize,
1206                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1207                    fetch_concurrent: 1,
1208                    replay_buffer: 1024 * 1024,
1209                    write_buffer: 1024 * 1024,
1210                };
1211                let (voter, resolver) = registrations
1212                    .remove(&validator)
1213                    .expect("validator should be registered");
1214                let engine = Engine::new(context.with_label("engine"), cfg);
1215                engine_handlers.push(engine.start(voter, resolver));
1216            }
1217
1218            // Wait for all engines to finish
1219            let mut finalizers = Vec::new();
1220            for supervisor in supervisors.iter_mut() {
1221                let (mut latest, mut monitor) = supervisor.subscribe().await;
1222                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1223                    while latest < required_containers {
1224                        latest = monitor.next().await.expect("event missing");
1225                    }
1226                }));
1227            }
1228            join_all(finalizers).await;
1229
1230            // Check supervisors for correct activity
1231            let slow = &validators[0];
1232            for supervisor in supervisors.iter() {
1233                // Ensure no faults
1234                {
1235                    let faults = supervisor.faults.lock().unwrap();
1236                    assert!(faults.is_empty());
1237                }
1238
1239                // Ensure slow node is never active (will never process anything fast enough to nullify)
1240                {
1241                    let notarizes = supervisor.notarizes.lock().unwrap();
1242                    for (view, payloads) in notarizes.iter() {
1243                        for (_, participants) in payloads.iter() {
1244                            if participants.contains(slow) {
1245                                panic!("view: {view}");
1246                            }
1247                        }
1248                    }
1249                }
1250                {
1251                    let nullifies = supervisor.nullifies.lock().unwrap();
1252                    for (view, participants) in nullifies.iter() {
1253                        // Start checking once all are online (leader may never have proposed)
1254                        if *view > 10 && participants.contains(slow) {
1255                            panic!("view: {view}");
1256                        }
1257                    }
1258                }
1259                {
1260                    let finalizes = supervisor.finalizes.lock().unwrap();
1261                    for (view, payloads) in finalizes.iter() {
1262                        for (_, finalizers) in payloads.iter() {
1263                            if finalizers.contains(slow) {
1264                                panic!("view: {view}");
1265                            }
1266                        }
1267                    }
1268                }
1269            }
1270        });
1271    }
1272
1273    #[test_traced]
1274    fn test_all_recovery() {
1275        // Create context
1276        let n = 5;
1277        let required_containers = 100;
1278        let activity_timeout = 10;
1279        let skip_timeout = 2;
1280        let namespace = b"consensus".to_vec();
1281        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1282        executor.start(|context| async move {
1283            // Create simulated network
1284            let (network, mut oracle) = Network::new(
1285                context.with_label("network"),
1286                Config {
1287                    max_size: 1024 * 1024,
1288                },
1289            );
1290
1291            // Start network
1292            network.start();
1293
1294            // Register participants
1295            let mut schemes = Vec::new();
1296            let mut validators = Vec::new();
1297            for i in 0..n {
1298                let scheme = PrivateKey::from_seed(i as u64);
1299                let pk = scheme.public_key();
1300                schemes.push(scheme);
1301                validators.push(pk);
1302            }
1303            validators.sort();
1304            schemes.sort_by_key(|s| s.public_key());
1305            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1306            let mut registrations = register_validators(&mut oracle, &validators).await;
1307
1308            // Link all validators
1309            let link = Link {
1310                latency: 3_000.0,
1311                jitter: 0.0,
1312                success_rate: 1.0,
1313            };
1314            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1315
1316            // Create engines
1317            let relay = Arc::new(mocks::relay::Relay::new());
1318            let mut supervisors = Vec::new();
1319            let mut engine_handlers = Vec::new();
1320            for scheme in schemes.iter() {
1321                // Create scheme context
1322                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1323
1324                // Start engine
1325                let validator = scheme.public_key();
1326                let supervisor_config = mocks::supervisor::Config {
1327                    namespace: namespace.clone(),
1328                    participants: view_validators.clone(),
1329                };
1330                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1331                    supervisor_config,
1332                );
1333                supervisors.push(supervisor.clone());
1334                let application_cfg = mocks::application::Config {
1335                    hasher: Sha256::default(),
1336                    relay: relay.clone(),
1337                    participant: validator.clone(),
1338                    propose_latency: (10.0, 5.0),
1339                    verify_latency: (10.0, 5.0),
1340                };
1341                let (actor, application) = mocks::application::Application::new(
1342                    context.with_label("application"),
1343                    application_cfg,
1344                );
1345                actor.start();
1346                let cfg = config::Config {
1347                    crypto: scheme.clone(),
1348                    automaton: application.clone(),
1349                    relay: application.clone(),
1350                    reporter: supervisor.clone(),
1351                    partition: validator.to_string(),
1352                    compression: Some(3),
1353                    supervisor,
1354                    mailbox_size: 1024,
1355                    namespace: namespace.clone(),
1356                    leader_timeout: Duration::from_secs(1),
1357                    notarization_timeout: Duration::from_secs(2),
1358                    nullify_retry: Duration::from_secs(10),
1359                    fetch_timeout: Duration::from_secs(1),
1360                    activity_timeout,
1361                    skip_timeout,
1362                    max_fetch_count: 1,
1363                    max_participants: n as usize,
1364                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1365                    fetch_concurrent: 1,
1366                    replay_buffer: 1024 * 1024,
1367                    write_buffer: 1024 * 1024,
1368                };
1369                let (voter, resolver) = registrations
1370                    .remove(&validator)
1371                    .expect("validator should be registered");
1372                let engine = Engine::new(context.with_label("engine"), cfg);
1373                engine_handlers.push(engine.start(voter, resolver));
1374            }
1375
1376            // Wait for a few virtual minutes (shouldn't finalize anything)
1377            let mut finalizers = Vec::new();
1378            for supervisor in supervisors.iter_mut() {
1379                let (_, mut monitor) = supervisor.subscribe().await;
1380                finalizers.push(
1381                    context
1382                        .with_label("finalizer")
1383                        .spawn(move |context| async move {
1384                            select! {
1385                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1386                                _done = monitor.next() => {
1387                                    panic!("engine should not notarize or finalize anything");
1388                                }
1389                            }
1390                        }),
1391                );
1392            }
1393            join_all(finalizers).await;
1394
1395            // Unlink all validators to get latest view
1396            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1397
1398            // Wait for a virtual minute (nothing should happen)
1399            context.sleep(Duration::from_secs(60)).await;
1400
1401            // Get latest view
1402            let mut latest = 0;
1403            for supervisor in supervisors.iter() {
1404                let nullifies = supervisor.nullifies.lock().unwrap();
1405                let max = nullifies.keys().max().unwrap();
1406                if *max > latest {
1407                    latest = *max;
1408                }
1409            }
1410
1411            // Update links
1412            let link = Link {
1413                latency: 10.0,
1414                jitter: 1.0,
1415                success_rate: 1.0,
1416            };
1417            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1418
1419            // Wait for all engines to finish
1420            let mut finalizers = Vec::new();
1421            for supervisor in supervisors.iter_mut() {
1422                let (mut latest, mut monitor) = supervisor.subscribe().await;
1423                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1424                    while latest < required_containers {
1425                        latest = monitor.next().await.expect("event missing");
1426                    }
1427                }));
1428            }
1429            join_all(finalizers).await;
1430
1431            // Check supervisors for correct activity
1432            for supervisor in supervisors.iter() {
1433                // Ensure no faults
1434                {
1435                    let faults = supervisor.faults.lock().unwrap();
1436                    assert!(faults.is_empty());
1437                }
1438
1439                // Ensure quick recovery.
1440                //
1441                // If the skip timeout isn't implemented correctly, we may go many views before participants
1442                // start to consider a validator's proposal.
1443                {
1444                    // Ensure nearly all views around latest finalize
1445                    let mut found = 0;
1446                    let finalizations = supervisor.finalizations.lock().unwrap();
1447                    for i in latest..latest + activity_timeout {
1448                        if finalizations.contains_key(&i) {
1449                            found += 1;
1450                        }
1451                    }
1452                    assert!(found >= activity_timeout - 2, "found: {found}");
1453                }
1454            }
1455        });
1456    }
1457
1458    #[test_traced]
1459    #[ignore]
1460    fn test_partition() {
1461        // Create context
1462        let n = 10;
1463        let required_containers = 50;
1464        let activity_timeout = 10;
1465        let skip_timeout = 5;
1466        let namespace = b"consensus".to_vec();
1467        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1468        executor.start(|context| async move {
1469            // Create simulated network
1470            let (network, mut oracle) = Network::new(
1471                context.with_label("network"),
1472                Config {
1473                    max_size: 1024 * 1024,
1474                },
1475            );
1476
1477            // Start network
1478            network.start();
1479
1480            // Register participants
1481            let mut schemes = Vec::new();
1482            let mut validators = Vec::new();
1483            for i in 0..n {
1484                let scheme = PrivateKey::from_seed(i as u64);
1485                let pk = scheme.public_key();
1486                schemes.push(scheme);
1487                validators.push(pk);
1488            }
1489            validators.sort();
1490            schemes.sort_by_key(|s| s.public_key());
1491            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1492            let mut registrations = register_validators(&mut oracle, &validators).await;
1493
1494            // Link all validators
1495            let link = Link {
1496                latency: 10.0,
1497                jitter: 1.0,
1498                success_rate: 1.0,
1499            };
1500            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1501
1502            // Create engines
1503            let relay = Arc::new(mocks::relay::Relay::new());
1504            let mut supervisors = Vec::new();
1505            let mut engine_handlers = Vec::new();
1506            for scheme in schemes.iter() {
1507                // Start engine
1508                let validator = scheme.public_key();
1509                let supervisor_config = mocks::supervisor::Config {
1510                    namespace: namespace.clone(),
1511                    participants: view_validators.clone(),
1512                };
1513                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1514                    supervisor_config,
1515                );
1516                supervisors.push(supervisor.clone());
1517                let application_cfg = mocks::application::Config {
1518                    hasher: Sha256::default(),
1519                    relay: relay.clone(),
1520                    participant: validator.clone(),
1521                    propose_latency: (10.0, 5.0),
1522                    verify_latency: (10.0, 5.0),
1523                };
1524                let (actor, application) = mocks::application::Application::new(
1525                    context.with_label("application"),
1526                    application_cfg,
1527                );
1528                actor.start();
1529                let cfg = config::Config {
1530                    crypto: scheme.clone(),
1531                    automaton: application.clone(),
1532                    relay: application.clone(),
1533                    reporter: supervisor.clone(),
1534                    partition: validator.to_string(),
1535                    compression: Some(3),
1536                    supervisor,
1537                    mailbox_size: 1024,
1538                    namespace: namespace.clone(),
1539                    leader_timeout: Duration::from_secs(1),
1540                    notarization_timeout: Duration::from_secs(2),
1541                    nullify_retry: Duration::from_secs(10),
1542                    fetch_timeout: Duration::from_secs(1),
1543                    activity_timeout,
1544                    skip_timeout,
1545                    max_fetch_count: 1,
1546                    max_participants: n as usize,
1547                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1548                    fetch_concurrent: 1,
1549                    replay_buffer: 1024 * 1024,
1550                    write_buffer: 1024 * 1024,
1551                };
1552                let (voter, resolver) = registrations
1553                    .remove(&validator)
1554                    .expect("validator should be registered");
1555                let engine = Engine::new(context.with_label("engine"), cfg);
1556                engine_handlers.push(engine.start(voter, resolver));
1557            }
1558
1559            // Wait for all engines to finish
1560            let mut finalizers = Vec::new();
1561            for supervisor in supervisors.iter_mut() {
1562                let (mut latest, mut monitor) = supervisor.subscribe().await;
1563                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1564                    while latest < required_containers {
1565                        latest = monitor.next().await.expect("event missing");
1566                    }
1567                }));
1568            }
1569            join_all(finalizers).await;
1570
1571            // Cut all links between validator halves
1572            fn separated(n: usize, a: usize, b: usize) -> bool {
1573                let m = n / 2;
1574                (a < m && b >= m) || (a >= m && b < m)
1575            }
1576            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1577
1578            // Wait for any in-progress notarizations/finalizations to finish
1579            context.sleep(Duration::from_secs(10)).await;
1580
1581            // Wait for a few virtual minutes (shouldn't finalize anything)
1582            let mut finalizers = Vec::new();
1583            for supervisor in supervisors.iter_mut() {
1584                let (_, mut monitor) = supervisor.subscribe().await;
1585                finalizers.push(
1586                    context
1587                        .with_label("finalizer")
1588                        .spawn(move |context| async move {
1589                            select! {
1590                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1591                                _done = monitor.next() => {
1592                                    panic!("engine should not notarize or finalize anything");
1593                                }
1594                            }
1595                        }),
1596                );
1597            }
1598            join_all(finalizers).await;
1599
1600            // Restore links
1601            link_validators(
1602                &mut oracle,
1603                &validators,
1604                Action::Link(link),
1605                Some(separated),
1606            )
1607            .await;
1608
1609            // Wait for all engines to finish
1610            let mut finalizers = Vec::new();
1611            for supervisor in supervisors.iter_mut() {
1612                let (mut latest, mut monitor) = supervisor.subscribe().await;
1613                let required = latest + required_containers;
1614                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1615                    while latest < required {
1616                        latest = monitor.next().await.expect("event missing");
1617                    }
1618                }));
1619            }
1620            join_all(finalizers).await;
1621
1622            // Check supervisors for correct activity
1623            for supervisor in supervisors.iter() {
1624                // Ensure no faults
1625                {
1626                    let faults = supervisor.faults.lock().unwrap();
1627                    assert!(faults.is_empty());
1628                }
1629            }
1630        });
1631    }
1632
1633    fn slow_and_lossy_links(seed: u64) -> String {
1634        // Create context
1635        let n = 5;
1636        let required_containers = 50;
1637        let activity_timeout = 10;
1638        let skip_timeout = 5;
1639        let namespace = b"consensus".to_vec();
1640        let cfg = deterministic::Config::new()
1641            .with_seed(seed)
1642            .with_timeout(Some(Duration::from_secs(5_000)));
1643        let executor = deterministic::Runner::new(cfg);
1644        executor.start(|context| async move {
1645            // Create simulated network
1646            let (network, mut oracle) = Network::new(
1647                context.with_label("network"),
1648                Config {
1649                    max_size: 1024 * 1024,
1650                },
1651            );
1652
1653            // Start network
1654            network.start();
1655
1656            // Register participants
1657            let mut schemes = Vec::new();
1658            let mut validators = Vec::new();
1659            for i in 0..n {
1660                let scheme = PrivateKey::from_seed(i as u64);
1661                let pk = scheme.public_key();
1662                schemes.push(scheme);
1663                validators.push(pk);
1664            }
1665            validators.sort();
1666            schemes.sort_by_key(|s| s.public_key());
1667            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1668            let mut registrations = register_validators(&mut oracle, &validators).await;
1669
1670            // Link all validators
1671            let degraded_link = Link {
1672                latency: 200.0,
1673                jitter: 150.0,
1674                success_rate: 0.5,
1675            };
1676            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1677
1678            // Create engines
1679            let relay = Arc::new(mocks::relay::Relay::new());
1680            let mut supervisors = Vec::new();
1681            let mut engine_handlers = Vec::new();
1682            for scheme in schemes.into_iter() {
1683                // Create scheme context
1684                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1685
1686                // Start engine
1687                let validator = scheme.public_key();
1688                let supervisor_config = mocks::supervisor::Config {
1689                    namespace: namespace.clone(),
1690                    participants: view_validators.clone(),
1691                };
1692                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1693                    supervisor_config,
1694                );
1695                supervisors.push(supervisor.clone());
1696                let application_cfg = mocks::application::Config {
1697                    hasher: Sha256::default(),
1698                    relay: relay.clone(),
1699                    participant: validator.clone(),
1700                    propose_latency: (10.0, 5.0),
1701                    verify_latency: (10.0, 5.0),
1702                };
1703                let (actor, application) = mocks::application::Application::new(
1704                    context.with_label("application"),
1705                    application_cfg,
1706                );
1707                actor.start();
1708                let cfg = config::Config {
1709                    crypto: scheme,
1710                    automaton: application.clone(),
1711                    relay: application.clone(),
1712                    reporter: supervisor.clone(),
1713                    partition: validator.to_string(),
1714                    compression: Some(3),
1715                    supervisor,
1716                    mailbox_size: 1024,
1717                    namespace: namespace.clone(),
1718                    leader_timeout: Duration::from_secs(1),
1719                    notarization_timeout: Duration::from_secs(2),
1720                    nullify_retry: Duration::from_secs(10),
1721                    fetch_timeout: Duration::from_secs(1),
1722                    activity_timeout,
1723                    skip_timeout,
1724                    max_fetch_count: 1,
1725                    max_participants: n as usize,
1726                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1727                    fetch_concurrent: 1,
1728                    replay_buffer: 1024 * 1024,
1729                    write_buffer: 1024 * 1024,
1730                };
1731                let (voter, resolver) = registrations
1732                    .remove(&validator)
1733                    .expect("validator should be registered");
1734                let engine = Engine::new(context.with_label("engine"), cfg);
1735                engine_handlers.push(engine.start(voter, resolver));
1736            }
1737
1738            // Wait for all engines to finish
1739            let mut finalizers = Vec::new();
1740            for supervisor in supervisors.iter_mut() {
1741                let (mut latest, mut monitor) = supervisor.subscribe().await;
1742                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1743                    while latest < required_containers {
1744                        latest = monitor.next().await.expect("event missing");
1745                    }
1746                }));
1747            }
1748            join_all(finalizers).await;
1749
1750            // Check supervisors for correct activity
1751            for supervisor in supervisors.iter() {
1752                // Ensure no faults
1753                {
1754                    let faults = supervisor.faults.lock().unwrap();
1755                    assert!(faults.is_empty());
1756                }
1757            }
1758
1759            context.auditor().state()
1760        })
1761    }
1762
1763    #[test_traced]
1764    fn test_slow_and_lossy_links() {
1765        slow_and_lossy_links(0);
1766    }
1767
1768    #[test_traced]
1769    #[ignore]
1770    fn test_determinism() {
1771        // We use slow and lossy links as the deterministic test
1772        // because it is the most complex test.
1773        for seed in 1..6 {
1774            // Run test with seed
1775            let state_1 = slow_and_lossy_links(seed);
1776
1777            // Run test again with same seed
1778            let state_2 = slow_and_lossy_links(seed);
1779
1780            // Ensure states are equal
1781            assert_eq!(state_1, state_2);
1782        }
1783    }
1784
1785    fn conflicter(seed: u64) {
1786        // Create context
1787        let n = 4;
1788        let required_containers = 50;
1789        let activity_timeout = 10;
1790        let skip_timeout = 5;
1791        let namespace = b"consensus".to_vec();
1792        let cfg = deterministic::Config::new()
1793            .with_seed(seed)
1794            .with_timeout(Some(Duration::from_secs(30)));
1795        let executor = deterministic::Runner::new(cfg);
1796        executor.start(|context| async move {
1797            // Create simulated network
1798            let (network, mut oracle) = Network::new(
1799                context.with_label("network"),
1800                Config {
1801                    max_size: 1024 * 1024,
1802                },
1803            );
1804
1805            // Start network
1806            network.start();
1807
1808            // Register participants
1809            let mut schemes = Vec::new();
1810            let mut validators = Vec::new();
1811            for i in 0..n {
1812                let scheme = PrivateKey::from_seed(i as u64);
1813                let pk = scheme.public_key();
1814                schemes.push(scheme);
1815                validators.push(pk);
1816            }
1817            validators.sort();
1818            schemes.sort_by_key(|s| s.public_key());
1819            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1820            let mut registrations = register_validators(&mut oracle, &validators).await;
1821
1822            // Link all validators
1823            let link = Link {
1824                latency: 10.0,
1825                jitter: 1.0,
1826                success_rate: 1.0,
1827            };
1828            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1829
1830            // Create engines
1831            let relay = Arc::new(mocks::relay::Relay::new());
1832            let mut supervisors = Vec::new();
1833            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1834                // Create scheme context
1835                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1836
1837                // Start engine
1838                let validator = scheme.public_key();
1839                let supervisor_config = mocks::supervisor::Config {
1840                    namespace: namespace.clone(),
1841                    participants: view_validators.clone(),
1842                };
1843                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1844                    supervisor_config,
1845                );
1846                if idx_scheme == 0 {
1847                    let cfg = mocks::conflicter::Config {
1848                        crypto: scheme,
1849                        supervisor,
1850                        namespace: namespace.clone(),
1851                    };
1852                    let (voter, _) = registrations
1853                        .remove(&validator)
1854                        .expect("validator should be registered");
1855                    let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1856                        mocks::conflicter::Conflicter::new(
1857                            context.with_label("byzantine_engine"),
1858                            cfg,
1859                        );
1860                    engine.start(voter);
1861                } else {
1862                    supervisors.push(supervisor.clone());
1863                    let application_cfg = mocks::application::Config {
1864                        hasher: Sha256::default(),
1865                        relay: relay.clone(),
1866                        participant: validator.clone(),
1867                        propose_latency: (10.0, 5.0),
1868                        verify_latency: (10.0, 5.0),
1869                    };
1870                    let (actor, application) = mocks::application::Application::new(
1871                        context.with_label("application"),
1872                        application_cfg,
1873                    );
1874                    actor.start();
1875                    let cfg = config::Config {
1876                        crypto: scheme,
1877                        automaton: application.clone(),
1878                        relay: application.clone(),
1879                        reporter: supervisor.clone(),
1880                        partition: validator.to_string(),
1881                        compression: Some(3),
1882                        supervisor,
1883                        mailbox_size: 1024,
1884                        namespace: namespace.clone(),
1885                        leader_timeout: Duration::from_secs(1),
1886                        notarization_timeout: Duration::from_secs(2),
1887                        nullify_retry: Duration::from_secs(10),
1888                        fetch_timeout: Duration::from_secs(1),
1889                        activity_timeout,
1890                        skip_timeout,
1891                        max_fetch_count: 1,
1892                        max_participants: n as usize,
1893                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1894                        fetch_concurrent: 1,
1895                        replay_buffer: 1024 * 1024,
1896                        write_buffer: 1024 * 1024,
1897                    };
1898                    let (voter, resolver) = registrations
1899                        .remove(&validator)
1900                        .expect("validator should be registered");
1901                    let engine = Engine::new(context.with_label("engine"), cfg);
1902                    engine.start(voter, resolver);
1903                }
1904            }
1905
1906            // Wait for all engines to finish
1907            let mut finalizers = Vec::new();
1908            for supervisor in supervisors.iter_mut() {
1909                let (mut latest, mut monitor) = supervisor.subscribe().await;
1910                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1911                    while latest < required_containers {
1912                        latest = monitor.next().await.expect("event missing");
1913                    }
1914                }));
1915            }
1916            join_all(finalizers).await;
1917
1918            // Check supervisors for correct activity
1919            let byz = &validators[0];
1920            let mut count_conflicting_notarize = 0;
1921            let mut count_conflicting_finalize = 0;
1922            for supervisor in supervisors.iter() {
1923                // Ensure only faults for byz
1924                {
1925                    let faults = supervisor.faults.lock().unwrap();
1926                    assert_eq!(faults.len(), 1);
1927                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
1928                    for (_, faults) in faulter.iter() {
1929                        for fault in faults.iter() {
1930                            match fault {
1931                                Activity::ConflictingNotarize(_) => {
1932                                    count_conflicting_notarize += 1;
1933                                }
1934                                Activity::ConflictingFinalize(_) => {
1935                                    count_conflicting_finalize += 1;
1936                                }
1937                                _ => panic!("unexpected fault: {fault:?}"),
1938                            }
1939                        }
1940                    }
1941                }
1942            }
1943            assert!(count_conflicting_notarize > 0);
1944            assert!(count_conflicting_finalize > 0);
1945        });
1946    }
1947
1948    #[test_traced]
1949    #[ignore]
1950    fn test_conflicter() {
1951        for seed in 0..5 {
1952            conflicter(seed);
1953        }
1954    }
1955
1956    fn nuller(seed: u64) {
1957        // Create context
1958        let n = 4;
1959        let required_containers = 50;
1960        let activity_timeout = 10;
1961        let skip_timeout = 5;
1962        let namespace = b"consensus".to_vec();
1963        let cfg = deterministic::Config::new()
1964            .with_seed(seed)
1965            .with_timeout(Some(Duration::from_secs(30)));
1966        let executor = deterministic::Runner::new(cfg);
1967        executor.start(|context| async move {
1968            // Create simulated network
1969            let (network, mut oracle) = Network::new(
1970                context.with_label("network"),
1971                Config {
1972                    max_size: 1024 * 1024,
1973                },
1974            );
1975
1976            // Start network
1977            network.start();
1978
1979            // Register participants
1980            let mut schemes = Vec::new();
1981            let mut validators = Vec::new();
1982            for i in 0..n {
1983                let scheme = PrivateKey::from_seed(i as u64);
1984                let pk = scheme.public_key();
1985                schemes.push(scheme);
1986                validators.push(pk);
1987            }
1988            validators.sort();
1989            schemes.sort_by_key(|s| s.public_key());
1990            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1991            let mut registrations = register_validators(&mut oracle, &validators).await;
1992
1993            // Link all validators
1994            let link = Link {
1995                latency: 10.0,
1996                jitter: 1.0,
1997                success_rate: 1.0,
1998            };
1999            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2000
2001            // Create engines
2002            let relay = Arc::new(mocks::relay::Relay::new());
2003            let mut supervisors = Vec::new();
2004            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2005                // Create scheme context
2006                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2007
2008                // Start engine
2009                let validator = scheme.public_key();
2010                let supervisor_config = mocks::supervisor::Config {
2011                    namespace: namespace.clone(),
2012                    participants: view_validators.clone(),
2013                };
2014                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2015                    supervisor_config,
2016                );
2017                if idx_scheme == 0 {
2018                    let cfg = mocks::nuller::Config {
2019                        crypto: scheme,
2020                        supervisor,
2021                        namespace: namespace.clone(),
2022                    };
2023                    let (voter, _) = registrations
2024                        .remove(&validator)
2025                        .expect("validator should be registered");
2026                    let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2027                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2028                    engine.start(voter);
2029                } else {
2030                    supervisors.push(supervisor.clone());
2031                    let application_cfg = mocks::application::Config {
2032                        hasher: Sha256::default(),
2033                        relay: relay.clone(),
2034                        participant: validator.clone(),
2035                        propose_latency: (10.0, 5.0),
2036                        verify_latency: (10.0, 5.0),
2037                    };
2038                    let (actor, application) = mocks::application::Application::new(
2039                        context.with_label("application"),
2040                        application_cfg,
2041                    );
2042                    actor.start();
2043                    let cfg = config::Config {
2044                        crypto: scheme,
2045                        automaton: application.clone(),
2046                        relay: application.clone(),
2047                        reporter: supervisor.clone(),
2048                        partition: validator.to_string(),
2049                        compression: Some(3),
2050                        supervisor,
2051                        mailbox_size: 1024,
2052                        namespace: namespace.clone(),
2053                        leader_timeout: Duration::from_secs(1),
2054                        notarization_timeout: Duration::from_secs(2),
2055                        nullify_retry: Duration::from_secs(10),
2056                        fetch_timeout: Duration::from_secs(1),
2057                        activity_timeout,
2058                        skip_timeout,
2059                        max_fetch_count: 1,
2060                        max_participants: n as usize,
2061                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2062                        fetch_concurrent: 1,
2063                        replay_buffer: 1024 * 1024,
2064                        write_buffer: 1024 * 1024,
2065                    };
2066                    let (voter, resolver) = registrations
2067                        .remove(&validator)
2068                        .expect("validator should be registered");
2069                    let engine = Engine::new(context.with_label("engine"), cfg);
2070                    engine.start(voter, resolver);
2071                }
2072            }
2073
2074            // Wait for all engines to finish
2075            let mut finalizers = Vec::new();
2076            for supervisor in supervisors.iter_mut() {
2077                let (mut latest, mut monitor) = supervisor.subscribe().await;
2078                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2079                    while latest < required_containers {
2080                        latest = monitor.next().await.expect("event missing");
2081                    }
2082                }));
2083            }
2084            join_all(finalizers).await;
2085
2086            // Check supervisors for correct activity
2087            let byz = &validators[0];
2088            let mut count_nullify_and_finalize = 0;
2089            for supervisor in supervisors.iter() {
2090                // Ensure only faults for byz
2091                {
2092                    let faults = supervisor.faults.lock().unwrap();
2093                    assert_eq!(faults.len(), 1);
2094                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2095                    for (_, faults) in faulter.iter() {
2096                        for fault in faults.iter() {
2097                            match fault {
2098                                Activity::NullifyFinalize(_) => {
2099                                    count_nullify_and_finalize += 1;
2100                                }
2101                                _ => panic!("unexpected fault: {fault:?}"),
2102                            }
2103                        }
2104                    }
2105                }
2106            }
2107            assert!(count_nullify_and_finalize > 0);
2108        });
2109    }
2110
2111    #[test_traced]
2112    #[ignore]
2113    fn test_nuller() {
2114        for seed in 0..5 {
2115            nuller(seed);
2116        }
2117    }
2118
2119    fn outdated(seed: u64) {
2120        // Create context
2121        let n = 4;
2122        let required_containers = 100;
2123        let activity_timeout = 10;
2124        let skip_timeout = 5;
2125        let namespace = b"consensus".to_vec();
2126        let cfg = deterministic::Config::new()
2127            .with_seed(seed)
2128            .with_timeout(Some(Duration::from_secs(30)));
2129        let executor = deterministic::Runner::new(cfg);
2130        executor.start(|context| async move {
2131            // Create simulated network
2132            let (network, mut oracle) = Network::new(
2133                context.with_label("network"),
2134                Config {
2135                    max_size: 1024 * 1024,
2136                },
2137            );
2138
2139            // Start network
2140            network.start();
2141
2142            // Register participants
2143            let mut schemes = Vec::new();
2144            let mut validators = Vec::new();
2145            for i in 0..n {
2146                let scheme = PrivateKey::from_seed(i as u64);
2147                let pk = scheme.public_key();
2148                schemes.push(scheme);
2149                validators.push(pk);
2150            }
2151            validators.sort();
2152            schemes.sort_by_key(|s| s.public_key());
2153            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2154            let mut registrations = register_validators(&mut oracle, &validators).await;
2155
2156            // Link all validators
2157            let link = Link {
2158                latency: 10.0,
2159                jitter: 1.0,
2160                success_rate: 1.0,
2161            };
2162            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2163
2164            // Create engines
2165            let relay = Arc::new(mocks::relay::Relay::new());
2166            let mut supervisors = Vec::new();
2167            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2168                // Create scheme context
2169                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2170
2171                // Start engine
2172                let validator = scheme.public_key();
2173                let supervisor_config = mocks::supervisor::Config {
2174                    namespace: namespace.clone(),
2175                    participants: view_validators.clone(),
2176                };
2177                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2178                    supervisor_config,
2179                );
2180                if idx_scheme == 0 {
2181                    let cfg = mocks::outdated::Config {
2182                        crypto: scheme,
2183                        supervisor,
2184                        namespace: namespace.clone(),
2185                        view_delta: activity_timeout * 4,
2186                    };
2187                    let (voter, _) = registrations
2188                        .remove(&validator)
2189                        .expect("validator should be registered");
2190                    let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2191                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2192                    engine.start(voter);
2193                } else {
2194                    supervisors.push(supervisor.clone());
2195                    let application_cfg = mocks::application::Config {
2196                        hasher: Sha256::default(),
2197                        relay: relay.clone(),
2198                        participant: validator.clone(),
2199                        propose_latency: (10.0, 5.0),
2200                        verify_latency: (10.0, 5.0),
2201                    };
2202                    let (actor, application) = mocks::application::Application::new(
2203                        context.with_label("application"),
2204                        application_cfg,
2205                    );
2206                    actor.start();
2207                    let cfg = config::Config {
2208                        crypto: scheme,
2209                        automaton: application.clone(),
2210                        relay: application.clone(),
2211                        reporter: supervisor.clone(),
2212                        supervisor,
2213                        partition: validator.to_string(),
2214                        compression: Some(3),
2215                        mailbox_size: 1024,
2216                        namespace: namespace.clone(),
2217                        leader_timeout: Duration::from_secs(1),
2218                        notarization_timeout: Duration::from_secs(2),
2219                        nullify_retry: Duration::from_secs(10),
2220                        fetch_timeout: Duration::from_secs(1),
2221                        activity_timeout,
2222                        skip_timeout,
2223                        max_fetch_count: 1,
2224                        max_participants: n as usize,
2225                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2226                        fetch_concurrent: 1,
2227                        replay_buffer: 1024 * 1024,
2228                        write_buffer: 1024 * 1024,
2229                    };
2230                    let (voter, resolver) = registrations
2231                        .remove(&validator)
2232                        .expect("validator should be registered");
2233                    let engine = Engine::new(context.with_label("engine"), cfg);
2234                    engine.start(voter, resolver);
2235                }
2236            }
2237
2238            // Wait for all engines to finish
2239            let mut finalizers = Vec::new();
2240            for supervisor in supervisors.iter_mut() {
2241                let (mut latest, mut monitor) = supervisor.subscribe().await;
2242                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2243                    while latest < required_containers {
2244                        latest = monitor.next().await.expect("event missing");
2245                    }
2246                }));
2247            }
2248            join_all(finalizers).await;
2249
2250            // Ensure no faults
2251            for supervisor in supervisors.iter() {
2252                {
2253                    let faults = supervisor.faults.lock().unwrap();
2254                    assert!(faults.is_empty());
2255                }
2256            }
2257        });
2258    }
2259
2260    #[test_traced]
2261    #[ignore]
2262    fn test_outdated() {
2263        for seed in 0..5 {
2264            outdated(seed);
2265        }
2266    }
2267
2268    #[test_traced]
2269    #[ignore]
2270    fn test_1k() {
2271        // Create context
2272        let n = 10;
2273        let required_containers = 1_000;
2274        let activity_timeout = 10;
2275        let skip_timeout = 5;
2276        let namespace = b"consensus".to_vec();
2277        let cfg = deterministic::Config::new();
2278        let executor = deterministic::Runner::new(cfg);
2279        executor.start(|context| async move {
2280            // Create simulated network
2281            let (network, mut oracle) = Network::new(
2282                context.with_label("network"),
2283                Config {
2284                    max_size: 1024 * 1024,
2285                },
2286            );
2287
2288            // Start network
2289            network.start();
2290
2291            // Register participants
2292            let mut schemes = Vec::new();
2293            let mut validators = Vec::new();
2294            for i in 0..n {
2295                let scheme = PrivateKey::from_seed(i as u64);
2296                let pk = scheme.public_key();
2297                schemes.push(scheme);
2298                validators.push(pk);
2299            }
2300            validators.sort();
2301            schemes.sort_by_key(|s| s.public_key());
2302            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2303            let mut registrations = register_validators(&mut oracle, &validators).await;
2304
2305            // Link all validators
2306            let link = Link {
2307                latency: 80.0,
2308                jitter: 10.0,
2309                success_rate: 0.98,
2310            };
2311            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2312
2313            // Create engines
2314            let relay = Arc::new(mocks::relay::Relay::new());
2315            let mut supervisors = Vec::new();
2316            let mut engine_handlers = Vec::new();
2317            for scheme in schemes.into_iter() {
2318                // Create scheme context
2319                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2320
2321                // Start engine
2322                let validator = scheme.public_key();
2323                let supervisor_config = mocks::supervisor::Config {
2324                    namespace: namespace.clone(),
2325                    participants: view_validators.clone(),
2326                };
2327                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2328                    supervisor_config,
2329                );
2330                supervisors.push(supervisor.clone());
2331                let application_cfg = mocks::application::Config {
2332                    hasher: Sha256::default(),
2333                    relay: relay.clone(),
2334                    participant: validator.clone(),
2335                    propose_latency: (100.0, 50.0),
2336                    verify_latency: (50.0, 40.0),
2337                };
2338                let (actor, application) = mocks::application::Application::new(
2339                    context.with_label("application"),
2340                    application_cfg,
2341                );
2342                actor.start();
2343                let cfg = config::Config {
2344                    crypto: scheme,
2345                    automaton: application.clone(),
2346                    relay: application.clone(),
2347                    reporter: supervisor.clone(),
2348                    partition: validator.to_string(),
2349                    compression: Some(3),
2350                    supervisor,
2351                    mailbox_size: 1024,
2352                    namespace: namespace.clone(),
2353                    leader_timeout: Duration::from_secs(1),
2354                    notarization_timeout: Duration::from_secs(2),
2355                    nullify_retry: Duration::from_secs(10),
2356                    fetch_timeout: Duration::from_secs(1),
2357                    activity_timeout,
2358                    skip_timeout,
2359                    max_fetch_count: 1,
2360                    max_participants: n as usize,
2361                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2362                    fetch_concurrent: 1,
2363                    replay_buffer: 1024 * 1024,
2364                    write_buffer: 1024 * 1024,
2365                };
2366                let (voter, resolver) = registrations
2367                    .remove(&validator)
2368                    .expect("validator should be registered");
2369                let engine = Engine::new(context.with_label("engine"), cfg);
2370                engine_handlers.push(engine.start(voter, resolver));
2371            }
2372
2373            // Wait for all engines to finish
2374            let mut finalizers = Vec::new();
2375            for supervisor in supervisors.iter_mut() {
2376                let (mut latest, mut monitor) = supervisor.subscribe().await;
2377                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2378                    while latest < required_containers {
2379                        latest = monitor.next().await.expect("event missing");
2380                    }
2381                }));
2382            }
2383            join_all(finalizers).await;
2384
2385            // Check supervisors for correct activity
2386            for supervisor in supervisors.iter() {
2387                // Ensure no faults
2388                {
2389                    let faults = supervisor.faults.lock().unwrap();
2390                    assert!(faults.is_empty());
2391                }
2392            }
2393        })
2394    }
2395}