commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal
5//! finalization latency in a partially synchronous setting.
6//!
7//! _If your application would benefit from succinct consensus certificates or a bias-resistant
8//! VRF, see [crate::threshold_simplex]._
9//!
10//! # Features
11//!
12//! * Wicked Fast Block Times (2 Network Hops)
13//! * Optimal Finalization Latency (3 Network Hops)
14//! * Externalized Uptime and Fault Proofs
15//! * Decoupled Block Broadcast and Sync
16//! * Flexible Block Format
17//!
18//! # Design
19//!
20//! ## Architecture
21//!
22//! All logic is split into three components: the `Voter`, the `Resolver`, and the `Application` (provided by the user).
23//! The `Voter` is responsible for participating in the latest view and the `Resolver` is responsible for fetching artifacts
24//! from previous views required to verify proposed blocks in the latest view.
25//!
26//! To provide great performance, all interactions between `Voter`, `Resolver`, and `Application` are
27//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
28//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
29//!
30//! ```txt
31//! +---------------+           +---------+            +++++++++++++++
32//! |               |<----------+         +----------->+             +
33//! |  Application  |           |  Voter  |            +    Peers    +
34//! |               +---------->|         |<-----------+             +
35//! +---------------+           +--+------+            +++++++++++++++
36//!                                |   ^
37//!                                |   |
38//!                                |   |
39//!                                |   |
40//!                                v   |
41//!                            +-------+----+          +++++++++++++++
42//!                            |            +--------->+             +
43//!                            |  Resolver  |          +    Peers    +
44//!                            |            |<---------+             +
45//!                            +------------+          +++++++++++++++
46//! ```
47//!
48//! _Application is usually a single object that implements the `Automaton`, `Relay`, `Committer`,
49//! and `Supervisor` traits._
50//!
51//! ## Joining Consensus
52//!
53//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter`
54//! will enter `v+1`. This means that a new participant joining consensus will immediately jump
55//! ahead to the latest view and begin participating in consensus (assuming it can verify blocks).
56//!
57//! ## Persistence
58//!
59//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
60//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
61//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
62//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
63//! on restart (especially in the case of unclean shutdown).
64//!
65//! ## Protocol Description
66//!
67//! ### Specification for View `v`
68//!
69//! Upon entering view `v`:
70//! * Determine leader `l` for view `v`
71//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
72//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
73//! * If leader `l`, broadcast `notarize(c,v)`
74//!   * If can't propose container in view `v` because missing notarization/nullification for a
75//!     previous view `v_m`, request `v_m`
76//!
77//! Upon receiving first `notarize(c,v)` from `l`:
78//! * Cancel `t_l`
79//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
80//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
81//!
82//! Upon receiving `2f+1` `notarize(c,v)`:
83//! * Cancel `t_a`
84//! * Mark `c` as notarized
85//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
86//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
87//! * Enter `v+1`
88//!
89//! Upon receiving `2f+1` `nullify(v)`:
90//! * Broadcast `nullification(v)`
91//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
92//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
93//!       instead.
94//! * Enter `v+1`
95//!
96//! Upon receiving `2f+1` `finalize(c,v)`:
97//! * Mark `c` as finalized (and recursively finalize its parents)
98//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
99//!
100//! Upon `t_l` or `t_a` firing:
101//! * Broadcast `nullify(v)`
102//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
103//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
104//!
105//! ### Deviations from Simplex Consensus
106//!
107//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
108//!   a set of all notarizations/nullifications for all historical blocks.
109//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
110//!   either a "block" or a "dummy block", respectively.
111//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
112//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
113//!   some number of views (again to trigger early view transition for an unresponsive leader).
114//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
115//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
116
117pub mod types;
118
119cfg_if::cfg_if! {
120    if #[cfg(not(target_arch = "wasm32"))] {
121        mod actors;
122        mod config;
123        pub use config::Config;
124        mod engine;
125        pub use engine::Engine;
126        mod metrics;
127    }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::Monitor;
137    use commonware_cryptography::{
138        ed25519::{PrivateKey, PublicKey},
139        sha256::Digest as Sha256Digest,
140        PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141    };
142    use commonware_macros::{select, test_traced};
143    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
145    use commonware_utils::{quorum, NZUsize, NZU32};
146    use engine::Engine;
147    use futures::{future::join_all, StreamExt};
148    use governor::Quota;
149    use rand::Rng as _;
150    use std::{
151        collections::{BTreeMap, HashMap},
152        num::NonZeroUsize,
153        sync::{Arc, Mutex},
154        time::Duration,
155    };
156    use tracing::debug;
157    use types::Activity;
158
159    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
160    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
161
162    /// Registers all validators using the oracle.
163    async fn register_validators<P: CPublicKey>(
164        oracle: &mut Oracle<P>,
165        validators: &[P],
166    ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
167        let mut registrations = HashMap::new();
168        for validator in validators.iter() {
169            let (voter_sender, voter_receiver) =
170                oracle.register(validator.clone(), 0).await.unwrap();
171            let (resolver_sender, resolver_receiver) =
172                oracle.register(validator.clone(), 1).await.unwrap();
173            registrations.insert(
174                validator.clone(),
175                (
176                    (voter_sender, voter_receiver),
177                    (resolver_sender, resolver_receiver),
178                ),
179            );
180        }
181        registrations
182    }
183
184    /// Enum to describe the action to take when linking validators.
185    enum Action {
186        Link(Link),
187        Update(Link), // Unlink and then link
188        Unlink,
189    }
190
191    /// Links (or unlinks) validators using the oracle.
192    ///
193    /// The `action` parameter determines the action (e.g. link, unlink) to take.
194    /// The `restrict_to` function can be used to restrict the linking to certain connections,
195    /// otherwise all validators will be linked to all other validators.
196    async fn link_validators<P: CPublicKey>(
197        oracle: &mut Oracle<P>,
198        validators: &[P],
199        action: Action,
200        restrict_to: Option<fn(usize, usize, usize) -> bool>,
201    ) {
202        for (i1, v1) in validators.iter().enumerate() {
203            for (i2, v2) in validators.iter().enumerate() {
204                // Ignore self
205                if v2 == v1 {
206                    continue;
207                }
208
209                // Restrict to certain connections
210                if let Some(f) = restrict_to {
211                    if !f(validators.len(), i1, i2) {
212                        continue;
213                    }
214                }
215
216                // Do any unlinking first
217                match action {
218                    Action::Update(_) | Action::Unlink => {
219                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
220                    }
221                    _ => {}
222                }
223
224                // Do any linking after
225                match action {
226                    Action::Link(ref link) | Action::Update(ref link) => {
227                        oracle
228                            .add_link(v1.clone(), v2.clone(), link.clone())
229                            .await
230                            .unwrap();
231                    }
232                    _ => {}
233                }
234            }
235        }
236    }
237
238    #[test_traced]
239    fn test_all_online() {
240        // Create context
241        let n = 5;
242        let threshold = quorum(n);
243        let max_exceptions = 10;
244        let required_containers = 100;
245        let activity_timeout = 10;
246        let skip_timeout = 5;
247        let namespace = b"consensus".to_vec();
248        let executor = deterministic::Runner::timed(Duration::from_secs(30));
249        executor.start(|context| async move {
250            // Create simulated network
251            let (network, mut oracle) = Network::new(
252                context.with_label("network"),
253                Config {
254                    max_size: 1024 * 1024,
255                },
256            );
257
258            // Start network
259            network.start();
260
261            // Register participants
262            let mut schemes = Vec::new();
263            let mut validators = Vec::new();
264            for i in 0..n {
265                let scheme = PrivateKey::from_seed(i as u64);
266                let pk = scheme.public_key();
267                schemes.push(scheme);
268                validators.push(pk);
269            }
270            validators.sort();
271            schemes.sort_by_key(|s| s.public_key());
272            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
273            let mut registrations = register_validators(&mut oracle, &validators).await;
274
275            // Link all validators
276            let link = Link {
277                latency: 10.0,
278                jitter: 1.0,
279                success_rate: 1.0,
280            };
281            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
282
283            // Create engines
284            let relay = Arc::new(mocks::relay::Relay::new());
285            let mut supervisors = Vec::new();
286            let mut engine_handlers = Vec::new();
287            for scheme in schemes.into_iter() {
288                // Create scheme context
289                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
290
291                // Start engine
292                let validator = scheme.public_key();
293                let supervisor_config = mocks::supervisor::Config {
294                    namespace: namespace.clone(),
295                    participants: view_validators.clone(),
296                };
297                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
298                    supervisor_config,
299                );
300                supervisors.push(supervisor.clone());
301                let application_cfg = mocks::application::Config {
302                    hasher: Sha256::default(),
303                    relay: relay.clone(),
304                    participant: validator.clone(),
305                    propose_latency: (10.0, 5.0),
306                    verify_latency: (10.0, 5.0),
307                };
308                let (actor, application) = mocks::application::Application::new(
309                    context.with_label("application"),
310                    application_cfg,
311                );
312                actor.start();
313                let cfg = config::Config {
314                    crypto: scheme,
315                    automaton: application.clone(),
316                    relay: application.clone(),
317                    reporter: supervisor.clone(),
318                    supervisor,
319                    partition: validator.to_string(),
320                    compression: Some(3),
321                    mailbox_size: 1024,
322                    namespace: namespace.clone(),
323                    leader_timeout: Duration::from_secs(1),
324                    notarization_timeout: Duration::from_secs(2),
325                    nullify_retry: Duration::from_secs(10),
326                    fetch_timeout: Duration::from_secs(1),
327                    activity_timeout,
328                    skip_timeout,
329                    max_fetch_count: 1,
330                    max_participants: n as usize,
331                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
332                    fetch_concurrent: 1,
333                    replay_buffer: NZUsize!(1024 * 1024),
334                    write_buffer: NZUsize!(1024 * 1024),
335                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
336                };
337                let engine = Engine::new(context.with_label("engine"), cfg);
338                let (voter, resolver) = registrations
339                    .remove(&validator)
340                    .expect("validator should be registered");
341                engine_handlers.push(engine.start(voter, resolver));
342            }
343
344            // Wait for all engines to finish
345            let mut finalizers = Vec::new();
346            for supervisor in supervisors.iter_mut() {
347                let (mut latest, mut monitor) = supervisor.subscribe().await;
348                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
349                    while latest < required_containers {
350                        latest = monitor.next().await.expect("event missing");
351                    }
352                }));
353            }
354            join_all(finalizers).await;
355
356            // Check supervisors for correct activity
357            let latest_complete = required_containers - activity_timeout;
358            for supervisor in supervisors.iter() {
359                // Ensure no faults
360                {
361                    let faults = supervisor.faults.lock().unwrap();
362                    assert!(faults.is_empty());
363                }
364
365                // Ensure no forks
366                let mut exceptions = 0;
367                let mut notarized = HashMap::new();
368                let mut finalized = HashMap::new();
369                {
370                    let notarizes = supervisor.notarizes.lock().unwrap();
371                    for view in 1..latest_complete {
372                        // Ensure only one payload proposed per view
373                        let Some(payloads) = notarizes.get(&view) else {
374                            exceptions += 1;
375                            continue;
376                        };
377                        if payloads.len() > 1 {
378                            panic!("view: {view}");
379                        }
380                        let (digest, notarizers) = payloads.iter().next().unwrap();
381                        notarized.insert(view, *digest);
382
383                        if notarizers.len() < threshold as usize {
384                            // We can't verify that everyone participated at every view because some nodes may
385                            // have started later.
386                            panic!("view: {view}");
387                        }
388                        if notarizers.len() != n as usize {
389                            exceptions += 1;
390                        }
391                    }
392                }
393                {
394                    let notarizations = supervisor.notarizations.lock().unwrap();
395                    for view in 1..latest_complete {
396                        // Ensure notarization matches digest from notarizes
397                        let Some(notarization) = notarizations.get(&view) else {
398                            exceptions += 1;
399                            continue;
400                        };
401                        let Some(digest) = notarized.get(&view) else {
402                            exceptions += 1;
403                            continue;
404                        };
405                        assert_eq!(&notarization.proposal.payload, digest);
406                    }
407                }
408                {
409                    let finalizes = supervisor.finalizes.lock().unwrap();
410                    for view in 1..latest_complete {
411                        // Ensure only one payload proposed per view
412                        let Some(payloads) = finalizes.get(&view) else {
413                            exceptions += 1;
414                            continue;
415                        };
416                        if payloads.len() > 1 {
417                            panic!("view: {view}");
418                        }
419                        let (digest, finalizers) = payloads.iter().next().unwrap();
420                        finalized.insert(view, *digest);
421
422                        // Only check at views below timeout
423                        if view > latest_complete {
424                            continue;
425                        }
426
427                        // Ensure everyone participating
428                        if finalizers.len() < threshold as usize {
429                            // We can't verify that everyone participated at every view because some nodes may
430                            // have started later.
431                            panic!("view: {view}");
432                        }
433                        if finalizers.len() != n as usize {
434                            exceptions += 1;
435                        }
436
437                        // Ensure no nullifies for any finalizers
438                        let nullifies = supervisor.nullifies.lock().unwrap();
439                        let Some(nullifies) = nullifies.get(&view) else {
440                            continue;
441                        };
442                        for (_, finalizers) in payloads.iter() {
443                            for finalizer in finalizers.iter() {
444                                if nullifies.contains(finalizer) {
445                                    panic!("should not nullify and finalize at same view");
446                                }
447                            }
448                        }
449                    }
450                }
451                {
452                    let finalizations = supervisor.finalizations.lock().unwrap();
453                    for view in 1..latest_complete {
454                        // Ensure finalization matches digest from finalizes
455                        let Some(finalization) = finalizations.get(&view) else {
456                            exceptions += 1;
457                            continue;
458                        };
459                        let Some(digest) = finalized.get(&view) else {
460                            exceptions += 1;
461                            continue;
462                        };
463                        assert_eq!(&finalization.proposal.payload, digest);
464                    }
465                }
466
467                // Ensure exceptions within allowed
468                assert!(exceptions <= max_exceptions);
469            }
470        });
471    }
472
473    #[test_traced]
474    fn test_unclean_shutdown() {
475        // Create context
476        let n = 5;
477        let required_containers = 100;
478        let activity_timeout = 10;
479        let skip_timeout = 5;
480        let namespace = b"consensus".to_vec();
481
482        // Random restarts every x seconds
483        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
484        let supervised = Arc::new(Mutex::new(Vec::new()));
485        let mut prev_ctx = None;
486
487        loop {
488            let namespace = namespace.clone();
489            let shutdowns = shutdowns.clone();
490            let supervised = supervised.clone();
491
492            let f = |mut context: deterministic::Context| async move {
493                // Create simulated network
494                let (network, mut oracle) = Network::new(
495                    context.with_label("network"),
496                    Config {
497                        max_size: 1024 * 1024,
498                    },
499                );
500
501                // Start network
502                network.start();
503
504                // Register participants
505                let mut schemes = Vec::new();
506                let mut validators = Vec::new();
507                for i in 0..n {
508                    let scheme = PrivateKey::from_seed(i as u64);
509                    let pk = scheme.public_key();
510                    schemes.push(scheme);
511                    validators.push(pk);
512                }
513                validators.sort();
514                schemes.sort_by_key(|s| s.public_key());
515                let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
516                let mut registrations = register_validators(&mut oracle, &validators).await;
517
518                // Link all validators
519                let link = Link {
520                    latency: 50.0,
521                    jitter: 50.0,
522                    success_rate: 1.0,
523                };
524                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
525
526                // Create engines
527                let relay = Arc::new(mocks::relay::Relay::new());
528                let mut supervisors = HashMap::new();
529                let mut engine_handlers = Vec::new();
530                for scheme in schemes.into_iter() {
531                    // Create scheme context
532                    let context = context
533                        .clone()
534                        .with_label(&format!("validator-{}", scheme.public_key()));
535
536                    // Start engine
537                    let validator = scheme.public_key();
538                    let supervisor_config = mocks::supervisor::Config {
539                        namespace: namespace.clone(),
540                        participants: view_validators.clone(),
541                    };
542                    let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
543                        supervisor_config,
544                    );
545                    supervisors.insert(validator.clone(), supervisor.clone());
546                    let application_cfg = mocks::application::Config {
547                        hasher: Sha256::default(),
548                        relay: relay.clone(),
549                        participant: validator.clone(),
550                        propose_latency: (10.0, 5.0),
551                        verify_latency: (10.0, 5.0),
552                    };
553                    let (actor, application) = mocks::application::Application::new(
554                        context.with_label("application"),
555                        application_cfg,
556                    );
557                    actor.start();
558                    let cfg = config::Config {
559                        crypto: scheme,
560                        automaton: application.clone(),
561                        relay: application.clone(),
562                        reporter: supervisor.clone(),
563                        supervisor,
564                        partition: validator.to_string(),
565                        compression: Some(3),
566                        mailbox_size: 1024,
567                        namespace: namespace.clone(),
568                        leader_timeout: Duration::from_secs(1),
569                        notarization_timeout: Duration::from_secs(2),
570                        nullify_retry: Duration::from_secs(10),
571                        fetch_timeout: Duration::from_secs(1),
572                        activity_timeout,
573                        skip_timeout,
574                        max_participants: n as usize,
575                        max_fetch_count: 1,
576                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
577                        fetch_concurrent: 1,
578                        replay_buffer: NZUsize!(1024 * 1024),
579                        write_buffer: NZUsize!(1024 * 1024),
580                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
581                    };
582                    let engine = Engine::new(context.with_label("engine"), cfg);
583                    let (voter_network, resolver_network) = registrations
584                        .remove(&validator)
585                        .expect("validator should be registered");
586                    engine_handlers.push(engine.start(voter_network, resolver_network));
587                }
588
589                // Store all finalizer handles
590                let mut finalizers = Vec::new();
591                for (_, supervisor) in supervisors.iter_mut() {
592                    let (mut latest, mut monitor) = supervisor.subscribe().await;
593                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
594                        while latest < required_containers {
595                            latest = monitor.next().await.expect("event missing");
596                        }
597                    }));
598                }
599
600                // Exit at random points for unclean shutdown of entire set
601                let wait =
602                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
603                select! {
604                    _ = context.sleep(wait) => {
605                        // Collect supervisors to check faults
606                        {
607                            let mut shutdowns = shutdowns.lock().unwrap();
608                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
609                            *shutdowns += 1;
610                        }
611                        supervised.lock().unwrap().push(supervisors);
612                        (false,context)
613                    },
614                    _ = join_all(finalizers) => {
615                        // Check supervisors for faults activity
616                        let supervised = supervised.lock().unwrap();
617                        for supervisors in supervised.iter() {
618                            for (_, supervisor) in supervisors.iter() {
619                                let faults = supervisor.faults.lock().unwrap();
620                                assert!(faults.is_empty());
621                            }
622                        }
623                        (true,context)
624                    }
625                }
626            };
627
628            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
629                deterministic::Runner::from(prev_ctx)
630            } else {
631                deterministic::Runner::timed(Duration::from_secs(30))
632            }
633            .start(f);
634
635            // If we are done, break
636            if complete {
637                break;
638            }
639
640            prev_ctx = Some(context.recover());
641        }
642    }
643
644    #[test_traced]
645    fn test_backfill() {
646        // Create context
647        let n = 4;
648        let required_containers = 100;
649        let activity_timeout = 10;
650        let skip_timeout = 5;
651        let namespace = b"consensus".to_vec();
652        let executor = deterministic::Runner::timed(Duration::from_secs(360));
653        executor.start(|context| async move {
654            // Create simulated network
655            let (network, mut oracle) = Network::new(
656                context.with_label("network"),
657                Config {
658                    max_size: 1024 * 1024,
659                },
660            );
661
662            // Start network
663            network.start();
664
665            // Register participants
666            let mut schemes = Vec::new();
667            let mut validators = Vec::new();
668            for i in 0..n {
669                let scheme = PrivateKey::from_seed(i as u64);
670                let pk = scheme.public_key();
671                schemes.push(scheme);
672                validators.push(pk);
673            }
674            validators.sort();
675            schemes.sort_by_key(|s| s.public_key());
676            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
677            let mut registrations = register_validators(&mut oracle, &validators).await;
678
679            // Link all validators except first
680            let link = Link {
681                latency: 10.0,
682                jitter: 1.0,
683                success_rate: 1.0,
684            };
685            link_validators(
686                &mut oracle,
687                &validators,
688                Action::Link(link),
689                Some(|_, i, j| ![i, j].contains(&0usize)),
690            )
691            .await;
692
693            // Create engines
694            let relay = Arc::new(mocks::relay::Relay::new());
695            let mut supervisors = Vec::new();
696            let mut engine_handlers = Vec::new();
697            for (idx_scheme, scheme) in schemes.iter().enumerate() {
698                // Skip first peer
699                if idx_scheme == 0 {
700                    continue;
701                }
702
703                // Create scheme context
704                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
705
706                // Start engine
707                let validator = scheme.public_key();
708                let supervisor_config = mocks::supervisor::Config {
709                    namespace: namespace.clone(),
710                    participants: view_validators.clone(),
711                };
712                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
713                    supervisor_config,
714                );
715                supervisors.push(supervisor.clone());
716                let application_cfg = mocks::application::Config {
717                    hasher: Sha256::default(),
718                    relay: relay.clone(),
719                    participant: validator.clone(),
720                    propose_latency: (10.0, 5.0),
721                    verify_latency: (10.0, 5.0),
722                };
723                let (actor, application) = mocks::application::Application::new(
724                    context.with_label("application"),
725                    application_cfg,
726                );
727                actor.start();
728                let cfg = config::Config {
729                    crypto: scheme.clone(),
730                    automaton: application.clone(),
731                    relay: application.clone(),
732                    reporter: supervisor.clone(),
733                    supervisor,
734                    partition: validator.to_string(),
735                    compression: Some(3),
736                    mailbox_size: 1024,
737                    namespace: namespace.clone(),
738                    leader_timeout: Duration::from_secs(1),
739                    notarization_timeout: Duration::from_secs(2),
740                    nullify_retry: Duration::from_secs(10),
741                    fetch_timeout: Duration::from_secs(1),
742                    activity_timeout,
743                    skip_timeout,
744                    max_fetch_count: 1, // force many fetches
745                    max_participants: n as usize,
746                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
747                    fetch_concurrent: 1,
748                    replay_buffer: NZUsize!(1024 * 1024),
749                    write_buffer: NZUsize!(1024 * 1024),
750                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
751                };
752                let (voter, resolver) = registrations
753                    .remove(&validator)
754                    .expect("validator should be registered");
755                let engine = Engine::new(context.with_label("engine"), cfg);
756                engine_handlers.push(engine.start(voter, resolver));
757            }
758
759            // Wait for all engines to finish
760            let mut finalizers = Vec::new();
761            for supervisor in supervisors.iter_mut() {
762                let (mut latest, mut monitor) = supervisor.subscribe().await;
763                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
764                    while latest < required_containers {
765                        latest = monitor.next().await.expect("event missing");
766                    }
767                }));
768            }
769            join_all(finalizers).await;
770
771            // Degrade network connections for online peers
772            let link = Link {
773                latency: 3_000.0,
774                jitter: 0.0,
775                success_rate: 1.0,
776            };
777            link_validators(
778                &mut oracle,
779                &validators,
780                Action::Update(link.clone()),
781                Some(|_, i, j| ![i, j].contains(&0usize)),
782            )
783            .await;
784
785            // Wait for nullifications to accrue
786            context.sleep(Duration::from_secs(120)).await;
787
788            // Unlink second peer from all (except first)
789            link_validators(
790                &mut oracle,
791                &validators,
792                Action::Unlink,
793                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
794            )
795            .await;
796
797            // Start engine for first peer
798            let scheme = schemes[0].clone();
799            let validator = scheme.public_key();
800
801            // Create scheme context
802            let context = context.with_label(&format!("validator-{}", scheme.public_key()));
803
804            // Link first peer to all (except second)
805            link_validators(
806                &mut oracle,
807                &validators,
808                Action::Link(link),
809                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
810            )
811            .await;
812
813            // Restore network connections for all online peers
814            let link = Link {
815                latency: 10.0,
816                jitter: 2.5,
817                success_rate: 1.0,
818            };
819            link_validators(
820                &mut oracle,
821                &validators,
822                Action::Update(link),
823                Some(|_, i, j| ![i, j].contains(&1usize)),
824            )
825            .await;
826
827            // Start engine
828            let supervisor_config = mocks::supervisor::Config {
829                namespace: namespace.clone(),
830                participants: view_validators.clone(),
831            };
832            let mut supervisor =
833                mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
834            supervisors.push(supervisor.clone());
835            let application_cfg = mocks::application::Config {
836                hasher: Sha256::default(),
837                relay: relay.clone(),
838                participant: validator.clone(),
839                propose_latency: (10.0, 5.0),
840                verify_latency: (10.0, 5.0),
841            };
842            let (actor, application) = mocks::application::Application::new(
843                context.with_label("application"),
844                application_cfg,
845            );
846            actor.start();
847            let cfg = config::Config {
848                crypto: scheme,
849                automaton: application.clone(),
850                relay: application.clone(),
851                reporter: supervisor.clone(),
852                supervisor: supervisor.clone(),
853                partition: validator.to_string(),
854                compression: Some(3),
855                mailbox_size: 1024,
856                namespace: namespace.clone(),
857                leader_timeout: Duration::from_secs(1),
858                notarization_timeout: Duration::from_secs(2),
859                nullify_retry: Duration::from_secs(10),
860                fetch_timeout: Duration::from_secs(1),
861                activity_timeout,
862                skip_timeout,
863                max_fetch_count: 1,
864                max_participants: n as usize,
865                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
866                fetch_concurrent: 1,
867                replay_buffer: NZUsize!(1024 * 1024),
868                write_buffer: NZUsize!(1024 * 1024),
869                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870            };
871            let (voter, resolver) = registrations
872                .remove(&validator)
873                .expect("validator should be registered");
874            let engine = Engine::new(context.with_label("engine"), cfg);
875            engine_handlers.push(engine.start(voter, resolver));
876
877            // Wait for new engine to finalize required
878            let (mut latest, mut monitor) = supervisor.subscribe().await;
879            while latest < required_containers {
880                latest = monitor.next().await.expect("event missing");
881            }
882        });
883    }
884
885    #[test_traced]
886    fn test_one_offline() {
887        // Create context
888        let n = 5;
889        let threshold = quorum(n);
890        let required_containers = 100;
891        let activity_timeout = 10;
892        let skip_timeout = 5;
893        let namespace = b"consensus".to_vec();
894        let executor = deterministic::Runner::timed(Duration::from_secs(30));
895        executor.start(|context| async move {
896            // Create simulated network
897            let (network, mut oracle) = Network::new(
898                context.with_label("network"),
899                Config {
900                    max_size: 1024 * 1024,
901                },
902            );
903
904            // Start network
905            network.start();
906
907            // Register participants
908            let mut schemes = Vec::new();
909            let mut validators = Vec::new();
910            for i in 0..n {
911                let scheme = PrivateKey::from_seed(i as u64);
912                let pk = scheme.public_key();
913                schemes.push(scheme);
914                validators.push(pk);
915            }
916            validators.sort();
917            schemes.sort_by_key(|s| s.public_key());
918            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
919            let mut registrations = register_validators(&mut oracle, &validators).await;
920
921            // Link all validators except first
922            let link = Link {
923                latency: 10.0,
924                jitter: 1.0,
925                success_rate: 1.0,
926            };
927            link_validators(
928                &mut oracle,
929                &validators,
930                Action::Link(link),
931                Some(|_, i, j| ![i, j].contains(&0usize)),
932            )
933            .await;
934
935            // Create engines
936            let relay = Arc::new(mocks::relay::Relay::new());
937            let mut supervisors = Vec::new();
938            let mut engine_handlers = Vec::new();
939            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
940                // Skip first peer
941                if idx_scheme == 0 {
942                    continue;
943                }
944
945                // Create scheme context
946                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
947
948                // Start engine
949                let validator = scheme.public_key();
950                let supervisor_config = mocks::supervisor::Config {
951                    namespace: namespace.clone(),
952                    participants: view_validators.clone(),
953                };
954                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
955                    supervisor_config,
956                );
957                supervisors.push(supervisor.clone());
958                let application_cfg = mocks::application::Config {
959                    hasher: Sha256::default(),
960                    relay: relay.clone(),
961                    participant: validator.clone(),
962                    propose_latency: (10.0, 5.0),
963                    verify_latency: (10.0, 5.0),
964                };
965                let (actor, application) = mocks::application::Application::new(
966                    context.with_label("application"),
967                    application_cfg,
968                );
969                actor.start();
970                let cfg = config::Config {
971                    crypto: scheme,
972                    automaton: application.clone(),
973                    relay: application.clone(),
974                    reporter: supervisor.clone(),
975                    partition: validator.to_string(),
976                    compression: Some(3),
977                    supervisor,
978                    mailbox_size: 1024,
979                    namespace: namespace.clone(),
980                    leader_timeout: Duration::from_secs(1),
981                    notarization_timeout: Duration::from_secs(2),
982                    nullify_retry: Duration::from_secs(10),
983                    fetch_timeout: Duration::from_secs(1),
984                    activity_timeout,
985                    skip_timeout,
986                    max_participants: n as usize,
987                    max_fetch_count: 1,
988                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
989                    fetch_concurrent: 1,
990                    replay_buffer: NZUsize!(1024 * 1024),
991                    write_buffer: NZUsize!(1024 * 1024),
992                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
993                };
994                let (voter, resolver) = registrations
995                    .remove(&validator)
996                    .expect("validator should be registered");
997                let engine = Engine::new(context.with_label("engine"), cfg);
998                engine_handlers.push(engine.start(voter, resolver));
999            }
1000
1001            // Wait for all engines to finish
1002            let mut finalizers = Vec::new();
1003            for supervisor in supervisors.iter_mut() {
1004                let (mut latest, mut monitor) = supervisor.subscribe().await;
1005                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1006                    while latest < required_containers {
1007                        latest = monitor.next().await.expect("event missing");
1008                    }
1009                }));
1010            }
1011            join_all(finalizers).await;
1012
1013            // Check supervisors for correct activity
1014            let offline = &validators[0];
1015            for supervisor in supervisors.iter() {
1016                // Ensure no faults
1017                {
1018                    let faults = supervisor.faults.lock().unwrap();
1019                    assert!(faults.is_empty());
1020                }
1021
1022                // Ensure offline node is never active
1023                {
1024                    let notarizes = supervisor.notarizes.lock().unwrap();
1025                    for (view, payloads) in notarizes.iter() {
1026                        for (_, participants) in payloads.iter() {
1027                            if participants.contains(offline) {
1028                                panic!("view: {view}");
1029                            }
1030                        }
1031                    }
1032                }
1033                {
1034                    let nullifies = supervisor.nullifies.lock().unwrap();
1035                    for (view, participants) in nullifies.iter() {
1036                        if participants.contains(offline) {
1037                            panic!("view: {view}");
1038                        }
1039                    }
1040                }
1041                {
1042                    let finalizes = supervisor.finalizes.lock().unwrap();
1043                    for (view, payloads) in finalizes.iter() {
1044                        for (_, finalizers) in payloads.iter() {
1045                            if finalizers.contains(offline) {
1046                                panic!("view: {view}");
1047                            }
1048                        }
1049                    }
1050                }
1051
1052                // Identify offline views
1053                let mut offline_views = Vec::new();
1054                {
1055                    let leaders = supervisor.leaders.lock().unwrap();
1056                    for (view, leader) in leaders.iter() {
1057                        if leader == offline {
1058                            offline_views.push(*view);
1059                        }
1060                    }
1061                }
1062                assert!(!offline_views.is_empty());
1063
1064                // Ensure nullifies/nullification collected for offline node
1065                {
1066                    let nullifies = supervisor.nullifies.lock().unwrap();
1067                    for view in offline_views.iter() {
1068                        let nullifies = nullifies.get(view).unwrap();
1069                        if nullifies.len() < threshold as usize {
1070                            panic!("view: {view}");
1071                        }
1072                    }
1073                }
1074                {
1075                    let nullifications = supervisor.nullifications.lock().unwrap();
1076                    for view in offline_views.iter() {
1077                        nullifications.get(view).unwrap();
1078                    }
1079                }
1080            }
1081
1082            // Ensure we are skipping views
1083            let encoded = context.encode();
1084            let lines = encoded.lines();
1085            let mut skipped_views = 0;
1086            let mut nodes_skipping = 0;
1087            for line in lines {
1088                if line.contains("_engine_voter_skipped_views_total") {
1089                    let parts: Vec<&str> = line.split_whitespace().collect();
1090                    if let Some(number_str) = parts.last() {
1091                        if let Ok(number) = number_str.parse::<u64>() {
1092                            if number > 0 {
1093                                nodes_skipping += 1;
1094                            }
1095                            if number > skipped_views {
1096                                skipped_views = number;
1097                            }
1098                        }
1099                    }
1100                }
1101            }
1102            assert!(
1103                skipped_views > 0,
1104                "expected skipped views to be greater than 0"
1105            );
1106            assert_eq!(
1107                nodes_skipping,
1108                n - 1,
1109                "expected all online nodes to be skipping views"
1110            );
1111        });
1112    }
1113
1114    #[test_traced]
1115    fn test_slow_validator() {
1116        // Create context
1117        let n = 5;
1118        let required_containers = 50;
1119        let activity_timeout = 10;
1120        let skip_timeout = 5;
1121        let namespace = b"consensus".to_vec();
1122        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1123        executor.start(|context| async move {
1124            // Create simulated network
1125            let (network, mut oracle) = Network::new(
1126                context.with_label("network"),
1127                Config {
1128                    max_size: 1024 * 1024,
1129                },
1130            );
1131
1132            // Start network
1133            network.start();
1134
1135            // Register participants
1136            let mut schemes = Vec::new();
1137            let mut validators = Vec::new();
1138            for i in 0..n {
1139                let scheme = PrivateKey::from_seed(i as u64);
1140                let pk = scheme.public_key();
1141                schemes.push(scheme);
1142                validators.push(pk);
1143            }
1144            validators.sort();
1145            schemes.sort_by_key(|s| s.public_key());
1146            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1147            let mut registrations = register_validators(&mut oracle, &validators).await;
1148
1149            // Link all validators
1150            let link = Link {
1151                latency: 10.0,
1152                jitter: 1.0,
1153                success_rate: 1.0,
1154            };
1155            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1156
1157            // Create engines
1158            let relay = Arc::new(mocks::relay::Relay::new());
1159            let mut supervisors = Vec::new();
1160            let mut engine_handlers = Vec::new();
1161            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1162                // Create scheme context
1163                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1164
1165                // Start engine
1166                let validator = scheme.public_key();
1167                let supervisor_config = mocks::supervisor::Config {
1168                    namespace: namespace.clone(),
1169                    participants: view_validators.clone(),
1170                };
1171                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1172                    supervisor_config,
1173                );
1174                supervisors.push(supervisor.clone());
1175                let application_cfg = if idx_scheme == 0 {
1176                    mocks::application::Config {
1177                        hasher: Sha256::default(),
1178                        relay: relay.clone(),
1179                        participant: validator.clone(),
1180                        propose_latency: (3_000.0, 0.0),
1181                        verify_latency: (3_000.0, 5.0),
1182                    }
1183                } else {
1184                    mocks::application::Config {
1185                        hasher: Sha256::default(),
1186                        relay: relay.clone(),
1187                        participant: validator.clone(),
1188                        propose_latency: (10.0, 5.0),
1189                        verify_latency: (10.0, 5.0),
1190                    }
1191                };
1192                let (actor, application) = mocks::application::Application::new(
1193                    context.with_label("application"),
1194                    application_cfg,
1195                );
1196                actor.start();
1197                let cfg = config::Config {
1198                    crypto: scheme,
1199                    automaton: application.clone(),
1200                    relay: application.clone(),
1201                    reporter: supervisor.clone(),
1202                    partition: validator.to_string(),
1203                    compression: Some(3),
1204                    supervisor,
1205                    mailbox_size: 1024,
1206                    namespace: namespace.clone(),
1207                    leader_timeout: Duration::from_secs(1),
1208                    notarization_timeout: Duration::from_secs(2),
1209                    nullify_retry: Duration::from_secs(10),
1210                    fetch_timeout: Duration::from_secs(1),
1211                    activity_timeout,
1212                    skip_timeout,
1213                    max_fetch_count: 1,
1214                    max_participants: n as usize,
1215                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1216                    fetch_concurrent: 1,
1217                    replay_buffer: NZUsize!(1024 * 1024),
1218                    write_buffer: NZUsize!(1024 * 1024),
1219                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1220                };
1221                let (voter, resolver) = registrations
1222                    .remove(&validator)
1223                    .expect("validator should be registered");
1224                let engine = Engine::new(context.with_label("engine"), cfg);
1225                engine_handlers.push(engine.start(voter, resolver));
1226            }
1227
1228            // Wait for all engines to finish
1229            let mut finalizers = Vec::new();
1230            for supervisor in supervisors.iter_mut() {
1231                let (mut latest, mut monitor) = supervisor.subscribe().await;
1232                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1233                    while latest < required_containers {
1234                        latest = monitor.next().await.expect("event missing");
1235                    }
1236                }));
1237            }
1238            join_all(finalizers).await;
1239
1240            // Check supervisors for correct activity
1241            let slow = &validators[0];
1242            for supervisor in supervisors.iter() {
1243                // Ensure no faults
1244                {
1245                    let faults = supervisor.faults.lock().unwrap();
1246                    assert!(faults.is_empty());
1247                }
1248
1249                // Ensure slow node is never active (will never process anything fast enough to nullify)
1250                {
1251                    let notarizes = supervisor.notarizes.lock().unwrap();
1252                    for (view, payloads) in notarizes.iter() {
1253                        for (_, participants) in payloads.iter() {
1254                            if participants.contains(slow) {
1255                                panic!("view: {view}");
1256                            }
1257                        }
1258                    }
1259                }
1260                {
1261                    let nullifies = supervisor.nullifies.lock().unwrap();
1262                    for (view, participants) in nullifies.iter() {
1263                        // Start checking once all are online (leader may never have proposed)
1264                        if *view > 10 && participants.contains(slow) {
1265                            panic!("view: {view}");
1266                        }
1267                    }
1268                }
1269                {
1270                    let finalizes = supervisor.finalizes.lock().unwrap();
1271                    for (view, payloads) in finalizes.iter() {
1272                        for (_, finalizers) in payloads.iter() {
1273                            if finalizers.contains(slow) {
1274                                panic!("view: {view}");
1275                            }
1276                        }
1277                    }
1278                }
1279            }
1280        });
1281    }
1282
1283    #[test_traced]
1284    fn test_all_recovery() {
1285        // Create context
1286        let n = 5;
1287        let required_containers = 100;
1288        let activity_timeout = 10;
1289        let skip_timeout = 2;
1290        let namespace = b"consensus".to_vec();
1291        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1292        executor.start(|context| async move {
1293            // Create simulated network
1294            let (network, mut oracle) = Network::new(
1295                context.with_label("network"),
1296                Config {
1297                    max_size: 1024 * 1024,
1298                },
1299            );
1300
1301            // Start network
1302            network.start();
1303
1304            // Register participants
1305            let mut schemes = Vec::new();
1306            let mut validators = Vec::new();
1307            for i in 0..n {
1308                let scheme = PrivateKey::from_seed(i as u64);
1309                let pk = scheme.public_key();
1310                schemes.push(scheme);
1311                validators.push(pk);
1312            }
1313            validators.sort();
1314            schemes.sort_by_key(|s| s.public_key());
1315            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1316            let mut registrations = register_validators(&mut oracle, &validators).await;
1317
1318            // Link all validators
1319            let link = Link {
1320                latency: 3_000.0,
1321                jitter: 0.0,
1322                success_rate: 1.0,
1323            };
1324            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1325
1326            // Create engines
1327            let relay = Arc::new(mocks::relay::Relay::new());
1328            let mut supervisors = Vec::new();
1329            let mut engine_handlers = Vec::new();
1330            for scheme in schemes.iter() {
1331                // Create scheme context
1332                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1333
1334                // Start engine
1335                let validator = scheme.public_key();
1336                let supervisor_config = mocks::supervisor::Config {
1337                    namespace: namespace.clone(),
1338                    participants: view_validators.clone(),
1339                };
1340                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1341                    supervisor_config,
1342                );
1343                supervisors.push(supervisor.clone());
1344                let application_cfg = mocks::application::Config {
1345                    hasher: Sha256::default(),
1346                    relay: relay.clone(),
1347                    participant: validator.clone(),
1348                    propose_latency: (10.0, 5.0),
1349                    verify_latency: (10.0, 5.0),
1350                };
1351                let (actor, application) = mocks::application::Application::new(
1352                    context.with_label("application"),
1353                    application_cfg,
1354                );
1355                actor.start();
1356                let cfg = config::Config {
1357                    crypto: scheme.clone(),
1358                    automaton: application.clone(),
1359                    relay: application.clone(),
1360                    reporter: supervisor.clone(),
1361                    partition: validator.to_string(),
1362                    compression: Some(3),
1363                    supervisor,
1364                    mailbox_size: 1024,
1365                    namespace: namespace.clone(),
1366                    leader_timeout: Duration::from_secs(1),
1367                    notarization_timeout: Duration::from_secs(2),
1368                    nullify_retry: Duration::from_secs(10),
1369                    fetch_timeout: Duration::from_secs(1),
1370                    activity_timeout,
1371                    skip_timeout,
1372                    max_fetch_count: 1,
1373                    max_participants: n as usize,
1374                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1375                    fetch_concurrent: 1,
1376                    replay_buffer: NZUsize!(1024 * 1024),
1377                    write_buffer: NZUsize!(1024 * 1024),
1378                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1379                };
1380                let (voter, resolver) = registrations
1381                    .remove(&validator)
1382                    .expect("validator should be registered");
1383                let engine = Engine::new(context.with_label("engine"), cfg);
1384                engine_handlers.push(engine.start(voter, resolver));
1385            }
1386
1387            // Wait for a few virtual minutes (shouldn't finalize anything)
1388            let mut finalizers = Vec::new();
1389            for supervisor in supervisors.iter_mut() {
1390                let (_, mut monitor) = supervisor.subscribe().await;
1391                finalizers.push(
1392                    context
1393                        .with_label("finalizer")
1394                        .spawn(move |context| async move {
1395                            select! {
1396                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1397                                _done = monitor.next() => {
1398                                    panic!("engine should not notarize or finalize anything");
1399                                }
1400                            }
1401                        }),
1402                );
1403            }
1404            join_all(finalizers).await;
1405
1406            // Unlink all validators to get latest view
1407            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1408
1409            // Wait for a virtual minute (nothing should happen)
1410            context.sleep(Duration::from_secs(60)).await;
1411
1412            // Get latest view
1413            let mut latest = 0;
1414            for supervisor in supervisors.iter() {
1415                let nullifies = supervisor.nullifies.lock().unwrap();
1416                let max = nullifies.keys().max().unwrap();
1417                if *max > latest {
1418                    latest = *max;
1419                }
1420            }
1421
1422            // Update links
1423            let link = Link {
1424                latency: 10.0,
1425                jitter: 1.0,
1426                success_rate: 1.0,
1427            };
1428            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1429
1430            // Wait for all engines to finish
1431            let mut finalizers = Vec::new();
1432            for supervisor in supervisors.iter_mut() {
1433                let (mut latest, mut monitor) = supervisor.subscribe().await;
1434                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1435                    while latest < required_containers {
1436                        latest = monitor.next().await.expect("event missing");
1437                    }
1438                }));
1439            }
1440            join_all(finalizers).await;
1441
1442            // Check supervisors for correct activity
1443            for supervisor in supervisors.iter() {
1444                // Ensure no faults
1445                {
1446                    let faults = supervisor.faults.lock().unwrap();
1447                    assert!(faults.is_empty());
1448                }
1449
1450                // Ensure quick recovery.
1451                //
1452                // If the skip timeout isn't implemented correctly, we may go many views before participants
1453                // start to consider a validator's proposal.
1454                {
1455                    // Ensure nearly all views around latest finalize
1456                    let mut found = 0;
1457                    let finalizations = supervisor.finalizations.lock().unwrap();
1458                    for i in latest..latest + activity_timeout {
1459                        if finalizations.contains_key(&i) {
1460                            found += 1;
1461                        }
1462                    }
1463                    assert!(found >= activity_timeout - 2, "found: {found}");
1464                }
1465            }
1466        });
1467    }
1468
1469    #[test_traced]
1470    #[ignore]
1471    fn test_partition() {
1472        // Create context
1473        let n = 10;
1474        let required_containers = 50;
1475        let activity_timeout = 10;
1476        let skip_timeout = 5;
1477        let namespace = b"consensus".to_vec();
1478        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1479        executor.start(|context| async move {
1480            // Create simulated network
1481            let (network, mut oracle) = Network::new(
1482                context.with_label("network"),
1483                Config {
1484                    max_size: 1024 * 1024,
1485                },
1486            );
1487
1488            // Start network
1489            network.start();
1490
1491            // Register participants
1492            let mut schemes = Vec::new();
1493            let mut validators = Vec::new();
1494            for i in 0..n {
1495                let scheme = PrivateKey::from_seed(i as u64);
1496                let pk = scheme.public_key();
1497                schemes.push(scheme);
1498                validators.push(pk);
1499            }
1500            validators.sort();
1501            schemes.sort_by_key(|s| s.public_key());
1502            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1503            let mut registrations = register_validators(&mut oracle, &validators).await;
1504
1505            // Link all validators
1506            let link = Link {
1507                latency: 10.0,
1508                jitter: 1.0,
1509                success_rate: 1.0,
1510            };
1511            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1512
1513            // Create engines
1514            let relay = Arc::new(mocks::relay::Relay::new());
1515            let mut supervisors = Vec::new();
1516            let mut engine_handlers = Vec::new();
1517            for scheme in schemes.iter() {
1518                // Start engine
1519                let validator = scheme.public_key();
1520                let supervisor_config = mocks::supervisor::Config {
1521                    namespace: namespace.clone(),
1522                    participants: view_validators.clone(),
1523                };
1524                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1525                    supervisor_config,
1526                );
1527                supervisors.push(supervisor.clone());
1528                let application_cfg = mocks::application::Config {
1529                    hasher: Sha256::default(),
1530                    relay: relay.clone(),
1531                    participant: validator.clone(),
1532                    propose_latency: (10.0, 5.0),
1533                    verify_latency: (10.0, 5.0),
1534                };
1535                let (actor, application) = mocks::application::Application::new(
1536                    context.with_label("application"),
1537                    application_cfg,
1538                );
1539                actor.start();
1540                let cfg = config::Config {
1541                    crypto: scheme.clone(),
1542                    automaton: application.clone(),
1543                    relay: application.clone(),
1544                    reporter: supervisor.clone(),
1545                    partition: validator.to_string(),
1546                    compression: Some(3),
1547                    supervisor,
1548                    mailbox_size: 1024,
1549                    namespace: namespace.clone(),
1550                    leader_timeout: Duration::from_secs(1),
1551                    notarization_timeout: Duration::from_secs(2),
1552                    nullify_retry: Duration::from_secs(10),
1553                    fetch_timeout: Duration::from_secs(1),
1554                    activity_timeout,
1555                    skip_timeout,
1556                    max_fetch_count: 1,
1557                    max_participants: n as usize,
1558                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1559                    fetch_concurrent: 1,
1560                    replay_buffer: NZUsize!(1024 * 1024),
1561                    write_buffer: NZUsize!(1024 * 1024),
1562                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1563                };
1564                let (voter, resolver) = registrations
1565                    .remove(&validator)
1566                    .expect("validator should be registered");
1567                let engine = Engine::new(context.with_label("engine"), cfg);
1568                engine_handlers.push(engine.start(voter, resolver));
1569            }
1570
1571            // Wait for all engines to finish
1572            let mut finalizers = Vec::new();
1573            for supervisor in supervisors.iter_mut() {
1574                let (mut latest, mut monitor) = supervisor.subscribe().await;
1575                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1576                    while latest < required_containers {
1577                        latest = monitor.next().await.expect("event missing");
1578                    }
1579                }));
1580            }
1581            join_all(finalizers).await;
1582
1583            // Cut all links between validator halves
1584            fn separated(n: usize, a: usize, b: usize) -> bool {
1585                let m = n / 2;
1586                (a < m && b >= m) || (a >= m && b < m)
1587            }
1588            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1589
1590            // Wait for any in-progress notarizations/finalizations to finish
1591            context.sleep(Duration::from_secs(10)).await;
1592
1593            // Wait for a few virtual minutes (shouldn't finalize anything)
1594            let mut finalizers = Vec::new();
1595            for supervisor in supervisors.iter_mut() {
1596                let (_, mut monitor) = supervisor.subscribe().await;
1597                finalizers.push(
1598                    context
1599                        .with_label("finalizer")
1600                        .spawn(move |context| async move {
1601                            select! {
1602                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1603                                _done = monitor.next() => {
1604                                    panic!("engine should not notarize or finalize anything");
1605                                }
1606                            }
1607                        }),
1608                );
1609            }
1610            join_all(finalizers).await;
1611
1612            // Restore links
1613            link_validators(
1614                &mut oracle,
1615                &validators,
1616                Action::Link(link),
1617                Some(separated),
1618            )
1619            .await;
1620
1621            // Wait for all engines to finish
1622            let mut finalizers = Vec::new();
1623            for supervisor in supervisors.iter_mut() {
1624                let (mut latest, mut monitor) = supervisor.subscribe().await;
1625                let required = latest + required_containers;
1626                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1627                    while latest < required {
1628                        latest = monitor.next().await.expect("event missing");
1629                    }
1630                }));
1631            }
1632            join_all(finalizers).await;
1633
1634            // Check supervisors for correct activity
1635            for supervisor in supervisors.iter() {
1636                // Ensure no faults
1637                {
1638                    let faults = supervisor.faults.lock().unwrap();
1639                    assert!(faults.is_empty());
1640                }
1641            }
1642        });
1643    }
1644
1645    fn slow_and_lossy_links(seed: u64) -> String {
1646        // Create context
1647        let n = 5;
1648        let required_containers = 50;
1649        let activity_timeout = 10;
1650        let skip_timeout = 5;
1651        let namespace = b"consensus".to_vec();
1652        let cfg = deterministic::Config::new()
1653            .with_seed(seed)
1654            .with_timeout(Some(Duration::from_secs(5_000)));
1655        let executor = deterministic::Runner::new(cfg);
1656        executor.start(|context| async move {
1657            // Create simulated network
1658            let (network, mut oracle) = Network::new(
1659                context.with_label("network"),
1660                Config {
1661                    max_size: 1024 * 1024,
1662                },
1663            );
1664
1665            // Start network
1666            network.start();
1667
1668            // Register participants
1669            let mut schemes = Vec::new();
1670            let mut validators = Vec::new();
1671            for i in 0..n {
1672                let scheme = PrivateKey::from_seed(i as u64);
1673                let pk = scheme.public_key();
1674                schemes.push(scheme);
1675                validators.push(pk);
1676            }
1677            validators.sort();
1678            schemes.sort_by_key(|s| s.public_key());
1679            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1680            let mut registrations = register_validators(&mut oracle, &validators).await;
1681
1682            // Link all validators
1683            let degraded_link = Link {
1684                latency: 200.0,
1685                jitter: 150.0,
1686                success_rate: 0.5,
1687            };
1688            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1689
1690            // Create engines
1691            let relay = Arc::new(mocks::relay::Relay::new());
1692            let mut supervisors = Vec::new();
1693            let mut engine_handlers = Vec::new();
1694            for scheme in schemes.into_iter() {
1695                // Create scheme context
1696                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1697
1698                // Start engine
1699                let validator = scheme.public_key();
1700                let supervisor_config = mocks::supervisor::Config {
1701                    namespace: namespace.clone(),
1702                    participants: view_validators.clone(),
1703                };
1704                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1705                    supervisor_config,
1706                );
1707                supervisors.push(supervisor.clone());
1708                let application_cfg = mocks::application::Config {
1709                    hasher: Sha256::default(),
1710                    relay: relay.clone(),
1711                    participant: validator.clone(),
1712                    propose_latency: (10.0, 5.0),
1713                    verify_latency: (10.0, 5.0),
1714                };
1715                let (actor, application) = mocks::application::Application::new(
1716                    context.with_label("application"),
1717                    application_cfg,
1718                );
1719                actor.start();
1720                let cfg = config::Config {
1721                    crypto: scheme,
1722                    automaton: application.clone(),
1723                    relay: application.clone(),
1724                    reporter: supervisor.clone(),
1725                    partition: validator.to_string(),
1726                    compression: Some(3),
1727                    supervisor,
1728                    mailbox_size: 1024,
1729                    namespace: namespace.clone(),
1730                    leader_timeout: Duration::from_secs(1),
1731                    notarization_timeout: Duration::from_secs(2),
1732                    nullify_retry: Duration::from_secs(10),
1733                    fetch_timeout: Duration::from_secs(1),
1734                    activity_timeout,
1735                    skip_timeout,
1736                    max_fetch_count: 1,
1737                    max_participants: n as usize,
1738                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1739                    fetch_concurrent: 1,
1740                    replay_buffer: NZUsize!(1024 * 1024),
1741                    write_buffer: NZUsize!(1024 * 1024),
1742                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1743                };
1744                let (voter, resolver) = registrations
1745                    .remove(&validator)
1746                    .expect("validator should be registered");
1747                let engine = Engine::new(context.with_label("engine"), cfg);
1748                engine_handlers.push(engine.start(voter, resolver));
1749            }
1750
1751            // Wait for all engines to finish
1752            let mut finalizers = Vec::new();
1753            for supervisor in supervisors.iter_mut() {
1754                let (mut latest, mut monitor) = supervisor.subscribe().await;
1755                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1756                    while latest < required_containers {
1757                        latest = monitor.next().await.expect("event missing");
1758                    }
1759                }));
1760            }
1761            join_all(finalizers).await;
1762
1763            // Check supervisors for correct activity
1764            for supervisor in supervisors.iter() {
1765                // Ensure no faults
1766                {
1767                    let faults = supervisor.faults.lock().unwrap();
1768                    assert!(faults.is_empty());
1769                }
1770            }
1771
1772            context.auditor().state()
1773        })
1774    }
1775
1776    #[test_traced]
1777    fn test_slow_and_lossy_links() {
1778        slow_and_lossy_links(0);
1779    }
1780
1781    #[test_traced]
1782    #[ignore]
1783    fn test_determinism() {
1784        // We use slow and lossy links as the deterministic test
1785        // because it is the most complex test.
1786        for seed in 1..6 {
1787            // Run test with seed
1788            let state_1 = slow_and_lossy_links(seed);
1789
1790            // Run test again with same seed
1791            let state_2 = slow_and_lossy_links(seed);
1792
1793            // Ensure states are equal
1794            assert_eq!(state_1, state_2);
1795        }
1796    }
1797
1798    fn conflicter(seed: u64) {
1799        // Create context
1800        let n = 4;
1801        let required_containers = 50;
1802        let activity_timeout = 10;
1803        let skip_timeout = 5;
1804        let namespace = b"consensus".to_vec();
1805        let cfg = deterministic::Config::new()
1806            .with_seed(seed)
1807            .with_timeout(Some(Duration::from_secs(30)));
1808        let executor = deterministic::Runner::new(cfg);
1809        executor.start(|context| async move {
1810            // Create simulated network
1811            let (network, mut oracle) = Network::new(
1812                context.with_label("network"),
1813                Config {
1814                    max_size: 1024 * 1024,
1815                },
1816            );
1817
1818            // Start network
1819            network.start();
1820
1821            // Register participants
1822            let mut schemes = Vec::new();
1823            let mut validators = Vec::new();
1824            for i in 0..n {
1825                let scheme = PrivateKey::from_seed(i as u64);
1826                let pk = scheme.public_key();
1827                schemes.push(scheme);
1828                validators.push(pk);
1829            }
1830            validators.sort();
1831            schemes.sort_by_key(|s| s.public_key());
1832            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1833            let mut registrations = register_validators(&mut oracle, &validators).await;
1834
1835            // Link all validators
1836            let link = Link {
1837                latency: 10.0,
1838                jitter: 1.0,
1839                success_rate: 1.0,
1840            };
1841            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1842
1843            // Create engines
1844            let relay = Arc::new(mocks::relay::Relay::new());
1845            let mut supervisors = Vec::new();
1846            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1847                // Create scheme context
1848                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1849
1850                // Start engine
1851                let validator = scheme.public_key();
1852                let supervisor_config = mocks::supervisor::Config {
1853                    namespace: namespace.clone(),
1854                    participants: view_validators.clone(),
1855                };
1856                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1857                    supervisor_config,
1858                );
1859                if idx_scheme == 0 {
1860                    let cfg = mocks::conflicter::Config {
1861                        crypto: scheme,
1862                        supervisor,
1863                        namespace: namespace.clone(),
1864                    };
1865                    let (voter, _) = registrations
1866                        .remove(&validator)
1867                        .expect("validator should be registered");
1868                    let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1869                        mocks::conflicter::Conflicter::new(
1870                            context.with_label("byzantine_engine"),
1871                            cfg,
1872                        );
1873                    engine.start(voter);
1874                } else {
1875                    supervisors.push(supervisor.clone());
1876                    let application_cfg = mocks::application::Config {
1877                        hasher: Sha256::default(),
1878                        relay: relay.clone(),
1879                        participant: validator.clone(),
1880                        propose_latency: (10.0, 5.0),
1881                        verify_latency: (10.0, 5.0),
1882                    };
1883                    let (actor, application) = mocks::application::Application::new(
1884                        context.with_label("application"),
1885                        application_cfg,
1886                    );
1887                    actor.start();
1888                    let cfg = config::Config {
1889                        crypto: scheme,
1890                        automaton: application.clone(),
1891                        relay: application.clone(),
1892                        reporter: supervisor.clone(),
1893                        partition: validator.to_string(),
1894                        compression: Some(3),
1895                        supervisor,
1896                        mailbox_size: 1024,
1897                        namespace: namespace.clone(),
1898                        leader_timeout: Duration::from_secs(1),
1899                        notarization_timeout: Duration::from_secs(2),
1900                        nullify_retry: Duration::from_secs(10),
1901                        fetch_timeout: Duration::from_secs(1),
1902                        activity_timeout,
1903                        skip_timeout,
1904                        max_fetch_count: 1,
1905                        max_participants: n as usize,
1906                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1907                        fetch_concurrent: 1,
1908                        replay_buffer: NZUsize!(1024 * 1024),
1909                        write_buffer: NZUsize!(1024 * 1024),
1910                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1911                    };
1912                    let (voter, resolver) = registrations
1913                        .remove(&validator)
1914                        .expect("validator should be registered");
1915                    let engine = Engine::new(context.with_label("engine"), cfg);
1916                    engine.start(voter, resolver);
1917                }
1918            }
1919
1920            // Wait for all engines to finish
1921            let mut finalizers = Vec::new();
1922            for supervisor in supervisors.iter_mut() {
1923                let (mut latest, mut monitor) = supervisor.subscribe().await;
1924                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1925                    while latest < required_containers {
1926                        latest = monitor.next().await.expect("event missing");
1927                    }
1928                }));
1929            }
1930            join_all(finalizers).await;
1931
1932            // Check supervisors for correct activity
1933            let byz = &validators[0];
1934            let mut count_conflicting_notarize = 0;
1935            let mut count_conflicting_finalize = 0;
1936            for supervisor in supervisors.iter() {
1937                // Ensure only faults for byz
1938                {
1939                    let faults = supervisor.faults.lock().unwrap();
1940                    assert_eq!(faults.len(), 1);
1941                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
1942                    for (_, faults) in faulter.iter() {
1943                        for fault in faults.iter() {
1944                            match fault {
1945                                Activity::ConflictingNotarize(_) => {
1946                                    count_conflicting_notarize += 1;
1947                                }
1948                                Activity::ConflictingFinalize(_) => {
1949                                    count_conflicting_finalize += 1;
1950                                }
1951                                _ => panic!("unexpected fault: {fault:?}"),
1952                            }
1953                        }
1954                    }
1955                }
1956            }
1957            assert!(count_conflicting_notarize > 0);
1958            assert!(count_conflicting_finalize > 0);
1959        });
1960    }
1961
1962    #[test_traced]
1963    #[ignore]
1964    fn test_conflicter() {
1965        for seed in 0..5 {
1966            conflicter(seed);
1967        }
1968    }
1969
1970    fn nuller(seed: u64) {
1971        // Create context
1972        let n = 4;
1973        let required_containers = 50;
1974        let activity_timeout = 10;
1975        let skip_timeout = 5;
1976        let namespace = b"consensus".to_vec();
1977        let cfg = deterministic::Config::new()
1978            .with_seed(seed)
1979            .with_timeout(Some(Duration::from_secs(30)));
1980        let executor = deterministic::Runner::new(cfg);
1981        executor.start(|context| async move {
1982            // Create simulated network
1983            let (network, mut oracle) = Network::new(
1984                context.with_label("network"),
1985                Config {
1986                    max_size: 1024 * 1024,
1987                },
1988            );
1989
1990            // Start network
1991            network.start();
1992
1993            // Register participants
1994            let mut schemes = Vec::new();
1995            let mut validators = Vec::new();
1996            for i in 0..n {
1997                let scheme = PrivateKey::from_seed(i as u64);
1998                let pk = scheme.public_key();
1999                schemes.push(scheme);
2000                validators.push(pk);
2001            }
2002            validators.sort();
2003            schemes.sort_by_key(|s| s.public_key());
2004            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2005            let mut registrations = register_validators(&mut oracle, &validators).await;
2006
2007            // Link all validators
2008            let link = Link {
2009                latency: 10.0,
2010                jitter: 1.0,
2011                success_rate: 1.0,
2012            };
2013            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2014
2015            // Create engines
2016            let relay = Arc::new(mocks::relay::Relay::new());
2017            let mut supervisors = Vec::new();
2018            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2019                // Create scheme context
2020                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2021
2022                // Start engine
2023                let validator = scheme.public_key();
2024                let supervisor_config = mocks::supervisor::Config {
2025                    namespace: namespace.clone(),
2026                    participants: view_validators.clone(),
2027                };
2028                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2029                    supervisor_config,
2030                );
2031                if idx_scheme == 0 {
2032                    let cfg = mocks::nuller::Config {
2033                        crypto: scheme,
2034                        supervisor,
2035                        namespace: namespace.clone(),
2036                    };
2037                    let (voter, _) = registrations
2038                        .remove(&validator)
2039                        .expect("validator should be registered");
2040                    let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2041                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2042                    engine.start(voter);
2043                } else {
2044                    supervisors.push(supervisor.clone());
2045                    let application_cfg = mocks::application::Config {
2046                        hasher: Sha256::default(),
2047                        relay: relay.clone(),
2048                        participant: validator.clone(),
2049                        propose_latency: (10.0, 5.0),
2050                        verify_latency: (10.0, 5.0),
2051                    };
2052                    let (actor, application) = mocks::application::Application::new(
2053                        context.with_label("application"),
2054                        application_cfg,
2055                    );
2056                    actor.start();
2057                    let cfg = config::Config {
2058                        crypto: scheme,
2059                        automaton: application.clone(),
2060                        relay: application.clone(),
2061                        reporter: supervisor.clone(),
2062                        partition: validator.to_string(),
2063                        compression: Some(3),
2064                        supervisor,
2065                        mailbox_size: 1024,
2066                        namespace: namespace.clone(),
2067                        leader_timeout: Duration::from_secs(1),
2068                        notarization_timeout: Duration::from_secs(2),
2069                        nullify_retry: Duration::from_secs(10),
2070                        fetch_timeout: Duration::from_secs(1),
2071                        activity_timeout,
2072                        skip_timeout,
2073                        max_fetch_count: 1,
2074                        max_participants: n as usize,
2075                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2076                        fetch_concurrent: 1,
2077                        replay_buffer: NZUsize!(1024 * 1024),
2078                        write_buffer: NZUsize!(1024 * 1024),
2079                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2080                    };
2081                    let (voter, resolver) = registrations
2082                        .remove(&validator)
2083                        .expect("validator should be registered");
2084                    let engine = Engine::new(context.with_label("engine"), cfg);
2085                    engine.start(voter, resolver);
2086                }
2087            }
2088
2089            // Wait for all engines to finish
2090            let mut finalizers = Vec::new();
2091            for supervisor in supervisors.iter_mut() {
2092                let (mut latest, mut monitor) = supervisor.subscribe().await;
2093                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2094                    while latest < required_containers {
2095                        latest = monitor.next().await.expect("event missing");
2096                    }
2097                }));
2098            }
2099            join_all(finalizers).await;
2100
2101            // Check supervisors for correct activity
2102            let byz = &validators[0];
2103            let mut count_nullify_and_finalize = 0;
2104            for supervisor in supervisors.iter() {
2105                // Ensure only faults for byz
2106                {
2107                    let faults = supervisor.faults.lock().unwrap();
2108                    assert_eq!(faults.len(), 1);
2109                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2110                    for (_, faults) in faulter.iter() {
2111                        for fault in faults.iter() {
2112                            match fault {
2113                                Activity::NullifyFinalize(_) => {
2114                                    count_nullify_and_finalize += 1;
2115                                }
2116                                _ => panic!("unexpected fault: {fault:?}"),
2117                            }
2118                        }
2119                    }
2120                }
2121            }
2122            assert!(count_nullify_and_finalize > 0);
2123        });
2124    }
2125
2126    #[test_traced]
2127    #[ignore]
2128    fn test_nuller() {
2129        for seed in 0..5 {
2130            nuller(seed);
2131        }
2132    }
2133
2134    fn outdated(seed: u64) {
2135        // Create context
2136        let n = 4;
2137        let required_containers = 100;
2138        let activity_timeout = 10;
2139        let skip_timeout = 5;
2140        let namespace = b"consensus".to_vec();
2141        let cfg = deterministic::Config::new()
2142            .with_seed(seed)
2143            .with_timeout(Some(Duration::from_secs(30)));
2144        let executor = deterministic::Runner::new(cfg);
2145        executor.start(|context| async move {
2146            // Create simulated network
2147            let (network, mut oracle) = Network::new(
2148                context.with_label("network"),
2149                Config {
2150                    max_size: 1024 * 1024,
2151                },
2152            );
2153
2154            // Start network
2155            network.start();
2156
2157            // Register participants
2158            let mut schemes = Vec::new();
2159            let mut validators = Vec::new();
2160            for i in 0..n {
2161                let scheme = PrivateKey::from_seed(i as u64);
2162                let pk = scheme.public_key();
2163                schemes.push(scheme);
2164                validators.push(pk);
2165            }
2166            validators.sort();
2167            schemes.sort_by_key(|s| s.public_key());
2168            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2169            let mut registrations = register_validators(&mut oracle, &validators).await;
2170
2171            // Link all validators
2172            let link = Link {
2173                latency: 10.0,
2174                jitter: 1.0,
2175                success_rate: 1.0,
2176            };
2177            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2178
2179            // Create engines
2180            let relay = Arc::new(mocks::relay::Relay::new());
2181            let mut supervisors = Vec::new();
2182            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2183                // Create scheme context
2184                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2185
2186                // Start engine
2187                let validator = scheme.public_key();
2188                let supervisor_config = mocks::supervisor::Config {
2189                    namespace: namespace.clone(),
2190                    participants: view_validators.clone(),
2191                };
2192                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2193                    supervisor_config,
2194                );
2195                if idx_scheme == 0 {
2196                    let cfg = mocks::outdated::Config {
2197                        crypto: scheme,
2198                        supervisor,
2199                        namespace: namespace.clone(),
2200                        view_delta: activity_timeout * 4,
2201                    };
2202                    let (voter, _) = registrations
2203                        .remove(&validator)
2204                        .expect("validator should be registered");
2205                    let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2206                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2207                    engine.start(voter);
2208                } else {
2209                    supervisors.push(supervisor.clone());
2210                    let application_cfg = mocks::application::Config {
2211                        hasher: Sha256::default(),
2212                        relay: relay.clone(),
2213                        participant: validator.clone(),
2214                        propose_latency: (10.0, 5.0),
2215                        verify_latency: (10.0, 5.0),
2216                    };
2217                    let (actor, application) = mocks::application::Application::new(
2218                        context.with_label("application"),
2219                        application_cfg,
2220                    );
2221                    actor.start();
2222                    let cfg = config::Config {
2223                        crypto: scheme,
2224                        automaton: application.clone(),
2225                        relay: application.clone(),
2226                        reporter: supervisor.clone(),
2227                        supervisor,
2228                        partition: validator.to_string(),
2229                        compression: Some(3),
2230                        mailbox_size: 1024,
2231                        namespace: namespace.clone(),
2232                        leader_timeout: Duration::from_secs(1),
2233                        notarization_timeout: Duration::from_secs(2),
2234                        nullify_retry: Duration::from_secs(10),
2235                        fetch_timeout: Duration::from_secs(1),
2236                        activity_timeout,
2237                        skip_timeout,
2238                        max_fetch_count: 1,
2239                        max_participants: n as usize,
2240                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2241                        fetch_concurrent: 1,
2242                        replay_buffer: NZUsize!(1024 * 1024),
2243                        write_buffer: NZUsize!(1024 * 1024),
2244                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2245                    };
2246                    let (voter, resolver) = registrations
2247                        .remove(&validator)
2248                        .expect("validator should be registered");
2249                    let engine = Engine::new(context.with_label("engine"), cfg);
2250                    engine.start(voter, resolver);
2251                }
2252            }
2253
2254            // Wait for all engines to finish
2255            let mut finalizers = Vec::new();
2256            for supervisor in supervisors.iter_mut() {
2257                let (mut latest, mut monitor) = supervisor.subscribe().await;
2258                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2259                    while latest < required_containers {
2260                        latest = monitor.next().await.expect("event missing");
2261                    }
2262                }));
2263            }
2264            join_all(finalizers).await;
2265
2266            // Ensure no faults
2267            for supervisor in supervisors.iter() {
2268                {
2269                    let faults = supervisor.faults.lock().unwrap();
2270                    assert!(faults.is_empty());
2271                }
2272            }
2273        });
2274    }
2275
2276    #[test_traced]
2277    #[ignore]
2278    fn test_outdated() {
2279        for seed in 0..5 {
2280            outdated(seed);
2281        }
2282    }
2283
2284    #[test_traced]
2285    #[ignore]
2286    fn test_1k() {
2287        // Create context
2288        let n = 10;
2289        let required_containers = 1_000;
2290        let activity_timeout = 10;
2291        let skip_timeout = 5;
2292        let namespace = b"consensus".to_vec();
2293        let cfg = deterministic::Config::new();
2294        let executor = deterministic::Runner::new(cfg);
2295        executor.start(|context| async move {
2296            // Create simulated network
2297            let (network, mut oracle) = Network::new(
2298                context.with_label("network"),
2299                Config {
2300                    max_size: 1024 * 1024,
2301                },
2302            );
2303
2304            // Start network
2305            network.start();
2306
2307            // Register participants
2308            let mut schemes = Vec::new();
2309            let mut validators = Vec::new();
2310            for i in 0..n {
2311                let scheme = PrivateKey::from_seed(i as u64);
2312                let pk = scheme.public_key();
2313                schemes.push(scheme);
2314                validators.push(pk);
2315            }
2316            validators.sort();
2317            schemes.sort_by_key(|s| s.public_key());
2318            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2319            let mut registrations = register_validators(&mut oracle, &validators).await;
2320
2321            // Link all validators
2322            let link = Link {
2323                latency: 80.0,
2324                jitter: 10.0,
2325                success_rate: 0.98,
2326            };
2327            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2328
2329            // Create engines
2330            let relay = Arc::new(mocks::relay::Relay::new());
2331            let mut supervisors = Vec::new();
2332            let mut engine_handlers = Vec::new();
2333            for scheme in schemes.into_iter() {
2334                // Create scheme context
2335                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2336
2337                // Start engine
2338                let validator = scheme.public_key();
2339                let supervisor_config = mocks::supervisor::Config {
2340                    namespace: namespace.clone(),
2341                    participants: view_validators.clone(),
2342                };
2343                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2344                    supervisor_config,
2345                );
2346                supervisors.push(supervisor.clone());
2347                let application_cfg = mocks::application::Config {
2348                    hasher: Sha256::default(),
2349                    relay: relay.clone(),
2350                    participant: validator.clone(),
2351                    propose_latency: (100.0, 50.0),
2352                    verify_latency: (50.0, 40.0),
2353                };
2354                let (actor, application) = mocks::application::Application::new(
2355                    context.with_label("application"),
2356                    application_cfg,
2357                );
2358                actor.start();
2359                let cfg = config::Config {
2360                    crypto: scheme,
2361                    automaton: application.clone(),
2362                    relay: application.clone(),
2363                    reporter: supervisor.clone(),
2364                    partition: validator.to_string(),
2365                    compression: Some(3),
2366                    supervisor,
2367                    mailbox_size: 1024,
2368                    namespace: namespace.clone(),
2369                    leader_timeout: Duration::from_secs(1),
2370                    notarization_timeout: Duration::from_secs(2),
2371                    nullify_retry: Duration::from_secs(10),
2372                    fetch_timeout: Duration::from_secs(1),
2373                    activity_timeout,
2374                    skip_timeout,
2375                    max_fetch_count: 1,
2376                    max_participants: n as usize,
2377                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2378                    fetch_concurrent: 1,
2379                    replay_buffer: NZUsize!(1024 * 1024),
2380                    write_buffer: NZUsize!(1024 * 1024),
2381                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2382                };
2383                let (voter, resolver) = registrations
2384                    .remove(&validator)
2385                    .expect("validator should be registered");
2386                let engine = Engine::new(context.with_label("engine"), cfg);
2387                engine_handlers.push(engine.start(voter, resolver));
2388            }
2389
2390            // Wait for all engines to finish
2391            let mut finalizers = Vec::new();
2392            for supervisor in supervisors.iter_mut() {
2393                let (mut latest, mut monitor) = supervisor.subscribe().await;
2394                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2395                    while latest < required_containers {
2396                        latest = monitor.next().await.expect("event missing");
2397                    }
2398                }));
2399            }
2400            join_all(finalizers).await;
2401
2402            // Check supervisors for correct activity
2403            for supervisor in supervisors.iter() {
2404                // Ensure no faults
2405                {
2406                    let faults = supervisor.faults.lock().unwrap();
2407                    assert!(faults.is_empty());
2408                }
2409            }
2410        })
2411    }
2412}