commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides
4//! simple and fast BFT agreement with network-speed view (i.e. block time) latency and optimal
5//! finalization latency in a partially synchronous setting.
6//!
7//! _If your application would benefit from succinct consensus certificates or a bias-resistant
8//! VRF, see [crate::threshold_simplex]._
9//!
10//! # Features
11//!
12//! * Wicked Fast Block Times (2 Network Hops)
13//! * Optimal Finalization Latency (3 Network Hops)
14//! * Externalized Uptime and Fault Proofs
15//! * Decoupled Block Broadcast and Sync
16//! * Flexible Block Format
17//!
18//! # Design
19//!
20//! ## Architecture
21//!
22//! All logic is split into three components: the `Voter`, the `Resolver`, and the `Application` (provided by the user).
23//! The `Voter` is responsible for participating in the latest view and the `Resolver` is responsible for fetching artifacts
24//! from previous views required to verify proposed blocks in the latest view.
25//!
26//! To provide great performance, all interactions between `Voter`, `Resolver`, and `Application` are
27//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
28//! `Application` verifies a proposed block or the `Resolver` verifies a notarization.
29//!
30//! ```txt
31//! +---------------+           +---------+            +++++++++++++++
32//! |               |<----------+         +----------->+             +
33//! |  Application  |           |  Voter  |            +    Peers    +
34//! |               +---------->|         |<-----------+             +
35//! +---------------+           +--+------+            +++++++++++++++
36//!                                |   ^
37//!                                |   |
38//!                                |   |
39//!                                |   |
40//!                                v   |
41//!                            +-------+----+          +++++++++++++++
42//!                            |            +--------->+             +
43//!                            |  Resolver  |          +    Peers    +
44//!                            |            |<---------+             +
45//!                            +------------+          +++++++++++++++
46//! ```
47//!
48//! _Application is usually a single object that implements the `Automaton`, `Relay`, `Committer`,
49//! and `Supervisor` traits._
50//!
51//! ## Joining Consensus
52//!
53//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter`
54//! will enter `v+1`. This means that a new participant joining consensus will immediately jump
55//! ahead to the latest view and begin participating in consensus (assuming it can verify blocks).
56//!
57//! ## Persistence
58//!
59//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
60//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
61//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::variable::Journal].
62//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
63//! on restart (especially in the case of unclean shutdown).
64//!
65//! ## Protocol Description
66//!
67//! ### Specification for View `v`
68//!
69//! Upon entering view `v`:
70//! * Determine leader `l` for view `v`
71//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
72//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
73//! * If leader `l`, broadcast `notarize(c,v)`
74//!   * If can't propose container in view `v` because missing notarization/nullification for a
75//!     previous view `v_m`, request `v_m`
76//!
77//! Upon receiving first `notarize(c,v)` from `l`:
78//! * Cancel `t_l`
79//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
80//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
81//!
82//! Upon receiving `2f+1` `notarize(c,v)`:
83//! * Cancel `t_a`
84//! * Mark `c` as notarized
85//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
86//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
87//! * Enter `v+1`
88//!
89//! Upon receiving `2f+1` `nullify(v)`:
90//! * Broadcast `nullification(v)`
91//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
92//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
93//!       instead.
94//! * Enter `v+1`
95//!
96//! Upon receiving `2f+1` `finalize(c,v)`:
97//! * Mark `c` as finalized (and recursively finalize its parents)
98//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
99//!
100//! Upon `t_l` or `t_a` firing:
101//! * Broadcast `nullify(v)`
102//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
103//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
104//!
105//! ### Deviations from Simplex Consensus
106//!
107//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
108//!   a set of all notarizations/nullifications for all historical blocks.
109//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
110//!   either a "block" or a "dummy block", respectively.
111//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
112//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
113//!   some number of views (again to trigger early view transition for an unresponsive leader).
114//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
115//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
116
117pub mod types;
118
119cfg_if::cfg_if! {
120    if #[cfg(not(target_arch = "wasm32"))] {
121        mod actors;
122        mod config;
123        pub use config::Config;
124        mod engine;
125        pub use engine::Engine;
126        mod metrics;
127    }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use crate::Monitor;
137    use commonware_cryptography::{
138        ed25519::{PrivateKey, PublicKey},
139        sha256::Digest as Sha256Digest,
140        PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141    };
142    use commonware_macros::{select, test_traced};
143    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
145    use commonware_utils::{quorum, NZUsize, NZU32};
146    use engine::Engine;
147    use futures::{future::join_all, StreamExt};
148    use governor::Quota;
149    use rand::Rng as _;
150    use std::{
151        collections::{BTreeMap, HashMap},
152        num::NonZeroUsize,
153        sync::{Arc, Mutex},
154        time::Duration,
155    };
156    use tracing::debug;
157    use types::Activity;
158
159    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
160    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
161
162    /// Registers all validators using the oracle.
163    async fn register_validators<P: CPublicKey>(
164        oracle: &mut Oracle<P>,
165        validators: &[P],
166    ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
167        let mut registrations = HashMap::new();
168        for validator in validators.iter() {
169            let (voter_sender, voter_receiver) =
170                oracle.register(validator.clone(), 0).await.unwrap();
171            let (resolver_sender, resolver_receiver) =
172                oracle.register(validator.clone(), 1).await.unwrap();
173            registrations.insert(
174                validator.clone(),
175                (
176                    (voter_sender, voter_receiver),
177                    (resolver_sender, resolver_receiver),
178                ),
179            );
180        }
181        registrations
182    }
183
184    /// Enum to describe the action to take when linking validators.
185    enum Action {
186        Link(Link),
187        Update(Link), // Unlink and then link
188        Unlink,
189    }
190
191    /// Links (or unlinks) validators using the oracle.
192    ///
193    /// The `action` parameter determines the action (e.g. link, unlink) to take.
194    /// The `restrict_to` function can be used to restrict the linking to certain connections,
195    /// otherwise all validators will be linked to all other validators.
196    async fn link_validators<P: CPublicKey>(
197        oracle: &mut Oracle<P>,
198        validators: &[P],
199        action: Action,
200        restrict_to: Option<fn(usize, usize, usize) -> bool>,
201    ) {
202        for (i1, v1) in validators.iter().enumerate() {
203            for (i2, v2) in validators.iter().enumerate() {
204                // Ignore self
205                if v2 == v1 {
206                    continue;
207                }
208
209                // Restrict to certain connections
210                if let Some(f) = restrict_to {
211                    if !f(validators.len(), i1, i2) {
212                        continue;
213                    }
214                }
215
216                // Do any unlinking first
217                match action {
218                    Action::Update(_) | Action::Unlink => {
219                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
220                    }
221                    _ => {}
222                }
223
224                // Do any linking after
225                match action {
226                    Action::Link(ref link) | Action::Update(ref link) => {
227                        oracle
228                            .add_link(v1.clone(), v2.clone(), link.clone())
229                            .await
230                            .unwrap();
231                    }
232                    _ => {}
233                }
234            }
235        }
236    }
237
238    #[test_traced]
239    fn test_all_online() {
240        // Create context
241        let n = 5;
242        let threshold = quorum(n);
243        let max_exceptions = 10;
244        let required_containers = 100;
245        let activity_timeout = 10;
246        let skip_timeout = 5;
247        let namespace = b"consensus".to_vec();
248        let executor = deterministic::Runner::timed(Duration::from_secs(30));
249        executor.start(|context| async move {
250            // Create simulated network
251            let (network, mut oracle) = Network::new(
252                context.with_label("network"),
253                Config {
254                    max_size: 1024 * 1024,
255                },
256            );
257
258            // Start network
259            network.start();
260
261            // Register participants
262            let mut schemes = Vec::new();
263            let mut validators = Vec::new();
264            for i in 0..n {
265                let scheme = PrivateKey::from_seed(i as u64);
266                let pk = scheme.public_key();
267                schemes.push(scheme);
268                validators.push(pk);
269            }
270            validators.sort();
271            schemes.sort_by_key(|s| s.public_key());
272            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
273            let mut registrations = register_validators(&mut oracle, &validators).await;
274
275            // Link all validators
276            let link = Link {
277                latency: Duration::from_millis(10),
278                jitter: Duration::from_millis(1),
279                success_rate: 1.0,
280            };
281            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
282
283            // Create engines
284            let relay = Arc::new(mocks::relay::Relay::new());
285            let mut supervisors = Vec::new();
286            let mut engine_handlers = Vec::new();
287            for scheme in schemes.into_iter() {
288                // Create scheme context
289                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
290
291                // Start engine
292                let validator = scheme.public_key();
293                let supervisor_config = mocks::supervisor::Config {
294                    namespace: namespace.clone(),
295                    participants: view_validators.clone(),
296                };
297                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
298                    supervisor_config,
299                );
300                supervisors.push(supervisor.clone());
301                let application_cfg = mocks::application::Config {
302                    hasher: Sha256::default(),
303                    relay: relay.clone(),
304                    participant: validator.clone(),
305                    propose_latency: (10.0, 5.0),
306                    verify_latency: (10.0, 5.0),
307                };
308                let (actor, application) = mocks::application::Application::new(
309                    context.with_label("application"),
310                    application_cfg,
311                );
312                actor.start();
313                let cfg = config::Config {
314                    crypto: scheme,
315                    automaton: application.clone(),
316                    relay: application.clone(),
317                    reporter: supervisor.clone(),
318                    supervisor,
319                    partition: validator.to_string(),
320                    mailbox_size: 1024,
321                    namespace: namespace.clone(),
322                    leader_timeout: Duration::from_secs(1),
323                    notarization_timeout: Duration::from_secs(2),
324                    nullify_retry: Duration::from_secs(10),
325                    fetch_timeout: Duration::from_secs(1),
326                    activity_timeout,
327                    skip_timeout,
328                    max_fetch_count: 1,
329                    max_participants: n as usize,
330                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
331                    fetch_concurrent: 1,
332                    replay_buffer: NZUsize!(1024 * 1024),
333                    write_buffer: NZUsize!(1024 * 1024),
334                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
335                };
336                let engine = Engine::new(context.with_label("engine"), cfg);
337                let (voter, resolver) = registrations
338                    .remove(&validator)
339                    .expect("validator should be registered");
340                engine_handlers.push(engine.start(voter, resolver));
341            }
342
343            // Wait for all engines to finish
344            let mut finalizers = Vec::new();
345            for supervisor in supervisors.iter_mut() {
346                let (mut latest, mut monitor) = supervisor.subscribe().await;
347                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
348                    while latest < required_containers {
349                        latest = monitor.next().await.expect("event missing");
350                    }
351                }));
352            }
353            join_all(finalizers).await;
354
355            // Check supervisors for correct activity
356            let latest_complete = required_containers - activity_timeout;
357            for supervisor in supervisors.iter() {
358                // Ensure no faults
359                {
360                    let faults = supervisor.faults.lock().unwrap();
361                    assert!(faults.is_empty());
362                }
363
364                // Ensure no forks
365                let mut exceptions = 0;
366                let mut notarized = HashMap::new();
367                let mut finalized = HashMap::new();
368                {
369                    let notarizes = supervisor.notarizes.lock().unwrap();
370                    for view in 1..latest_complete {
371                        // Ensure only one payload proposed per view
372                        let Some(payloads) = notarizes.get(&view) else {
373                            exceptions += 1;
374                            continue;
375                        };
376                        if payloads.len() > 1 {
377                            panic!("view: {view}");
378                        }
379                        let (digest, notarizers) = payloads.iter().next().unwrap();
380                        notarized.insert(view, *digest);
381
382                        if notarizers.len() < threshold as usize {
383                            // We can't verify that everyone participated at every view because some nodes may
384                            // have started later.
385                            panic!("view: {view}");
386                        }
387                        if notarizers.len() != n as usize {
388                            exceptions += 1;
389                        }
390                    }
391                }
392                {
393                    let notarizations = supervisor.notarizations.lock().unwrap();
394                    for view in 1..latest_complete {
395                        // Ensure notarization matches digest from notarizes
396                        let Some(notarization) = notarizations.get(&view) else {
397                            exceptions += 1;
398                            continue;
399                        };
400                        let Some(digest) = notarized.get(&view) else {
401                            exceptions += 1;
402                            continue;
403                        };
404                        assert_eq!(&notarization.proposal.payload, digest);
405                    }
406                }
407                {
408                    let finalizes = supervisor.finalizes.lock().unwrap();
409                    for view in 1..latest_complete {
410                        // Ensure only one payload proposed per view
411                        let Some(payloads) = finalizes.get(&view) else {
412                            exceptions += 1;
413                            continue;
414                        };
415                        if payloads.len() > 1 {
416                            panic!("view: {view}");
417                        }
418                        let (digest, finalizers) = payloads.iter().next().unwrap();
419                        finalized.insert(view, *digest);
420
421                        // Only check at views below timeout
422                        if view > latest_complete {
423                            continue;
424                        }
425
426                        // Ensure everyone participating
427                        if finalizers.len() < threshold as usize {
428                            // We can't verify that everyone participated at every view because some nodes may
429                            // have started later.
430                            panic!("view: {view}");
431                        }
432                        if finalizers.len() != n as usize {
433                            exceptions += 1;
434                        }
435
436                        // Ensure no nullifies for any finalizers
437                        let nullifies = supervisor.nullifies.lock().unwrap();
438                        let Some(nullifies) = nullifies.get(&view) else {
439                            continue;
440                        };
441                        for (_, finalizers) in payloads.iter() {
442                            for finalizer in finalizers.iter() {
443                                if nullifies.contains(finalizer) {
444                                    panic!("should not nullify and finalize at same view");
445                                }
446                            }
447                        }
448                    }
449                }
450                {
451                    let finalizations = supervisor.finalizations.lock().unwrap();
452                    for view in 1..latest_complete {
453                        // Ensure finalization matches digest from finalizes
454                        let Some(finalization) = finalizations.get(&view) else {
455                            exceptions += 1;
456                            continue;
457                        };
458                        let Some(digest) = finalized.get(&view) else {
459                            exceptions += 1;
460                            continue;
461                        };
462                        assert_eq!(&finalization.proposal.payload, digest);
463                    }
464                }
465
466                // Ensure exceptions within allowed
467                assert!(exceptions <= max_exceptions);
468            }
469        });
470    }
471
472    #[test_traced]
473    fn test_observer() {
474        // Create context
475        let n_active = 5;
476        let required_containers = 100;
477        let activity_timeout = 10;
478        let skip_timeout = 5;
479        let namespace = b"consensus".to_vec();
480        let executor = deterministic::Runner::timed(Duration::from_secs(30));
481        executor.start(|context| async move {
482            // Create simulated network
483            let (network, mut oracle) = Network::new(
484                context.with_label("network"),
485                Config {
486                    max_size: 1024 * 1024,
487                },
488            );
489
490            // Start network
491            network.start();
492
493            // Register participants
494            let mut schemes = Vec::new();
495            let mut validators_active = Vec::new();
496            for i in 0..n_active {
497                let scheme = PrivateKey::from_seed(i as u64);
498                let pk = scheme.public_key();
499                schemes.push(scheme);
500                validators_active.push(pk);
501            }
502            validators_active.sort();
503
504            // Add observer not in participants
505            let scheme_observer = PrivateKey::from_seed(n_active as u64);
506            let pk_observer = scheme_observer.public_key();
507            schemes.push(scheme_observer);
508
509            // Register all (including observer) with the network
510            let mut all_validators = validators_active.clone();
511            all_validators.push(pk_observer.clone());
512            all_validators.sort();
513            let mut registrations = register_validators(&mut oracle, &all_validators).await;
514
515            // Link all peers (including observer)
516            let link = Link {
517                latency: Duration::from_millis(10),
518                jitter: Duration::from_millis(1),
519                success_rate: 1.0,
520            };
521            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
522
523            // Create engines
524            let relay = Arc::new(mocks::relay::Relay::new());
525            let mut supervisors = Vec::new();
526            let participants = BTreeMap::from_iter(vec![(0, validators_active.clone())]);
527            for scheme in schemes.into_iter() {
528                // Create scheme context
529                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
530
531                // Create supervisor
532                let supervisor_config = mocks::supervisor::Config {
533                    namespace: namespace.clone(),
534                    participants: participants.clone(),
535                };
536                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
537                    supervisor_config,
538                );
539                supervisors.push(supervisor.clone());
540
541                // Configure application
542                let validator = scheme.public_key();
543                let application_cfg = mocks::application::Config {
544                    hasher: Sha256::default(),
545                    relay: relay.clone(),
546                    participant: validator.clone(),
547                    propose_latency: (10.0, 5.0),
548                    verify_latency: (10.0, 5.0),
549                };
550                let (actor, application) = mocks::application::Application::new(
551                    context.with_label("application"),
552                    application_cfg,
553                );
554                actor.start();
555
556                // Configure engine
557                let cfg = config::Config {
558                    crypto: scheme,
559                    automaton: application.clone(),
560                    relay: application.clone(),
561                    reporter: supervisor.clone(),
562                    supervisor,
563                    partition: validator.to_string(),
564                    mailbox_size: 1024,
565                    namespace: namespace.clone(),
566                    leader_timeout: Duration::from_secs(1),
567                    notarization_timeout: Duration::from_secs(2),
568                    nullify_retry: Duration::from_secs(10),
569                    fetch_timeout: Duration::from_secs(1),
570                    activity_timeout,
571                    skip_timeout,
572                    max_participants: n_active as usize,
573                    max_fetch_count: 1,
574                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
575                    fetch_concurrent: 1,
576                    replay_buffer: NZUsize!(1024 * 1024),
577                    write_buffer: NZUsize!(1024 * 1024),
578                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
579                };
580                let engine = Engine::new(context.with_label("engine"), cfg);
581                let (voter, resolver) = registrations
582                    .remove(&validator)
583                    .expect("validator should be registered");
584                engine.start(voter, resolver);
585            }
586
587            // Wait for all engines (including the observer) to finish
588            let mut finalizers = Vec::new();
589            for supervisor in supervisors.iter_mut() {
590                let (mut latest, mut monitor) = supervisor.subscribe().await;
591                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
592                    while latest < required_containers {
593                        latest = monitor.next().await.expect("event missing");
594                    }
595                }));
596            }
597            join_all(finalizers).await;
598        });
599    }
600
601    #[test_traced]
602    fn test_unclean_shutdown() {
603        // Create context
604        let n = 5;
605        let required_containers = 100;
606        let activity_timeout = 10;
607        let skip_timeout = 5;
608        let namespace = b"consensus".to_vec();
609
610        // Random restarts every x seconds
611        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
612        let supervised = Arc::new(Mutex::new(Vec::new()));
613        let mut prev_ctx = None;
614
615        loop {
616            let namespace = namespace.clone();
617            let shutdowns = shutdowns.clone();
618            let supervised = supervised.clone();
619
620            let f = |mut context: deterministic::Context| async move {
621                // Create simulated network
622                let (network, mut oracle) = Network::new(
623                    context.with_label("network"),
624                    Config {
625                        max_size: 1024 * 1024,
626                    },
627                );
628
629                // Start network
630                network.start();
631
632                // Register participants
633                let mut schemes = Vec::new();
634                let mut validators = Vec::new();
635                for i in 0..n {
636                    let scheme = PrivateKey::from_seed(i as u64);
637                    let pk = scheme.public_key();
638                    schemes.push(scheme);
639                    validators.push(pk);
640                }
641                validators.sort();
642                schemes.sort_by_key(|s| s.public_key());
643                let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
644                let mut registrations = register_validators(&mut oracle, &validators).await;
645
646                // Link all validators
647                let link = Link {
648                    latency: Duration::from_millis(50),
649                    jitter: Duration::from_millis(50),
650                    success_rate: 1.0,
651                };
652                link_validators(&mut oracle, &validators, Action::Link(link), None).await;
653
654                // Create engines
655                let relay = Arc::new(mocks::relay::Relay::new());
656                let mut supervisors = HashMap::new();
657                let mut engine_handlers = Vec::new();
658                for scheme in schemes.into_iter() {
659                    // Create scheme context
660                    let context = context
661                        .clone()
662                        .with_label(&format!("validator-{}", scheme.public_key()));
663
664                    // Start engine
665                    let validator = scheme.public_key();
666                    let supervisor_config = mocks::supervisor::Config {
667                        namespace: namespace.clone(),
668                        participants: view_validators.clone(),
669                    };
670                    let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
671                        supervisor_config,
672                    );
673                    supervisors.insert(validator.clone(), supervisor.clone());
674                    let application_cfg = mocks::application::Config {
675                        hasher: Sha256::default(),
676                        relay: relay.clone(),
677                        participant: validator.clone(),
678                        propose_latency: (10.0, 5.0),
679                        verify_latency: (10.0, 5.0),
680                    };
681                    let (actor, application) = mocks::application::Application::new(
682                        context.with_label("application"),
683                        application_cfg,
684                    );
685                    actor.start();
686                    let cfg = config::Config {
687                        crypto: scheme,
688                        automaton: application.clone(),
689                        relay: application.clone(),
690                        reporter: supervisor.clone(),
691                        supervisor,
692                        partition: validator.to_string(),
693                        mailbox_size: 1024,
694                        namespace: namespace.clone(),
695                        leader_timeout: Duration::from_secs(1),
696                        notarization_timeout: Duration::from_secs(2),
697                        nullify_retry: Duration::from_secs(10),
698                        fetch_timeout: Duration::from_secs(1),
699                        activity_timeout,
700                        skip_timeout,
701                        max_participants: n as usize,
702                        max_fetch_count: 1,
703                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
704                        fetch_concurrent: 1,
705                        replay_buffer: NZUsize!(1024 * 1024),
706                        write_buffer: NZUsize!(1024 * 1024),
707                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
708                    };
709                    let engine = Engine::new(context.with_label("engine"), cfg);
710                    let (voter_network, resolver_network) = registrations
711                        .remove(&validator)
712                        .expect("validator should be registered");
713                    engine_handlers.push(engine.start(voter_network, resolver_network));
714                }
715
716                // Store all finalizer handles
717                let mut finalizers = Vec::new();
718                for (_, supervisor) in supervisors.iter_mut() {
719                    let (mut latest, mut monitor) = supervisor.subscribe().await;
720                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
721                        while latest < required_containers {
722                            latest = monitor.next().await.expect("event missing");
723                        }
724                    }));
725                }
726
727                // Exit at random points for unclean shutdown of entire set
728                let wait =
729                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
730                select! {
731                    _ = context.sleep(wait) => {
732                        // Collect supervisors to check faults
733                        {
734                            let mut shutdowns = shutdowns.lock().unwrap();
735                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
736                            *shutdowns += 1;
737                        }
738                        supervised.lock().unwrap().push(supervisors);
739                        (false,context)
740                    },
741                    _ = join_all(finalizers) => {
742                        // Check supervisors for faults activity
743                        let supervised = supervised.lock().unwrap();
744                        for supervisors in supervised.iter() {
745                            for (_, supervisor) in supervisors.iter() {
746                                let faults = supervisor.faults.lock().unwrap();
747                                assert!(faults.is_empty());
748                            }
749                        }
750                        (true,context)
751                    }
752                }
753            };
754
755            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
756                deterministic::Runner::from(prev_ctx)
757            } else {
758                deterministic::Runner::timed(Duration::from_secs(30))
759            }
760            .start(f);
761
762            // If we are done, break
763            if complete {
764                break;
765            }
766
767            prev_ctx = Some(context.recover());
768        }
769    }
770
771    #[test_traced]
772    fn test_backfill() {
773        // Create context
774        let n = 4;
775        let required_containers = 100;
776        let activity_timeout = 10;
777        let skip_timeout = 5;
778        let namespace = b"consensus".to_vec();
779        let executor = deterministic::Runner::timed(Duration::from_secs(360));
780        executor.start(|context| async move {
781            // Create simulated network
782            let (network, mut oracle) = Network::new(
783                context.with_label("network"),
784                Config {
785                    max_size: 1024 * 1024,
786                },
787            );
788
789            // Start network
790            network.start();
791
792            // Register participants
793            let mut schemes = Vec::new();
794            let mut validators = Vec::new();
795            for i in 0..n {
796                let scheme = PrivateKey::from_seed(i as u64);
797                let pk = scheme.public_key();
798                schemes.push(scheme);
799                validators.push(pk);
800            }
801            validators.sort();
802            schemes.sort_by_key(|s| s.public_key());
803            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
804            let mut registrations = register_validators(&mut oracle, &validators).await;
805
806            // Link all validators except first
807            let link = Link {
808                latency: Duration::from_millis(10),
809                jitter: Duration::from_millis(1),
810                success_rate: 1.0,
811            };
812            link_validators(
813                &mut oracle,
814                &validators,
815                Action::Link(link),
816                Some(|_, i, j| ![i, j].contains(&0usize)),
817            )
818            .await;
819
820            // Create engines
821            let relay = Arc::new(mocks::relay::Relay::new());
822            let mut supervisors = Vec::new();
823            let mut engine_handlers = Vec::new();
824            for (idx_scheme, scheme) in schemes.iter().enumerate() {
825                // Skip first peer
826                if idx_scheme == 0 {
827                    continue;
828                }
829
830                // Create scheme context
831                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
832
833                // Start engine
834                let validator = scheme.public_key();
835                let supervisor_config = mocks::supervisor::Config {
836                    namespace: namespace.clone(),
837                    participants: view_validators.clone(),
838                };
839                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
840                    supervisor_config,
841                );
842                supervisors.push(supervisor.clone());
843                let application_cfg = mocks::application::Config {
844                    hasher: Sha256::default(),
845                    relay: relay.clone(),
846                    participant: validator.clone(),
847                    propose_latency: (10.0, 5.0),
848                    verify_latency: (10.0, 5.0),
849                };
850                let (actor, application) = mocks::application::Application::new(
851                    context.with_label("application"),
852                    application_cfg,
853                );
854                actor.start();
855                let cfg = config::Config {
856                    crypto: scheme.clone(),
857                    automaton: application.clone(),
858                    relay: application.clone(),
859                    reporter: supervisor.clone(),
860                    supervisor,
861                    partition: validator.to_string(),
862                    mailbox_size: 1024,
863                    namespace: namespace.clone(),
864                    leader_timeout: Duration::from_secs(1),
865                    notarization_timeout: Duration::from_secs(2),
866                    nullify_retry: Duration::from_secs(10),
867                    fetch_timeout: Duration::from_secs(1),
868                    activity_timeout,
869                    skip_timeout,
870                    max_fetch_count: 1, // force many fetches
871                    max_participants: n as usize,
872                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
873                    fetch_concurrent: 1,
874                    replay_buffer: NZUsize!(1024 * 1024),
875                    write_buffer: NZUsize!(1024 * 1024),
876                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
877                };
878                let (voter, resolver) = registrations
879                    .remove(&validator)
880                    .expect("validator should be registered");
881                let engine = Engine::new(context.with_label("engine"), cfg);
882                engine_handlers.push(engine.start(voter, resolver));
883            }
884
885            // Wait for all engines to finish
886            let mut finalizers = Vec::new();
887            for supervisor in supervisors.iter_mut() {
888                let (mut latest, mut monitor) = supervisor.subscribe().await;
889                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
890                    while latest < required_containers {
891                        latest = monitor.next().await.expect("event missing");
892                    }
893                }));
894            }
895            join_all(finalizers).await;
896
897            // Degrade network connections for online peers
898            let link = Link {
899                latency: Duration::from_secs(3),
900                jitter: Duration::from_millis(0),
901                success_rate: 1.0,
902            };
903            link_validators(
904                &mut oracle,
905                &validators,
906                Action::Update(link.clone()),
907                Some(|_, i, j| ![i, j].contains(&0usize)),
908            )
909            .await;
910
911            // Wait for nullifications to accrue
912            context.sleep(Duration::from_secs(120)).await;
913
914            // Unlink second peer from all (except first)
915            link_validators(
916                &mut oracle,
917                &validators,
918                Action::Unlink,
919                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
920            )
921            .await;
922
923            // Start engine for first peer
924            let scheme = schemes[0].clone();
925            let validator = scheme.public_key();
926
927            // Create scheme context
928            let context = context.with_label(&format!("validator-{}", scheme.public_key()));
929
930            // Link first peer to all (except second)
931            link_validators(
932                &mut oracle,
933                &validators,
934                Action::Link(link),
935                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
936            )
937            .await;
938
939            // Restore network connections for all online peers
940            let link = Link {
941                latency: Duration::from_millis(10),
942                jitter: Duration::from_millis(3),
943                success_rate: 1.0,
944            };
945            link_validators(
946                &mut oracle,
947                &validators,
948                Action::Update(link),
949                Some(|_, i, j| ![i, j].contains(&1usize)),
950            )
951            .await;
952
953            // Start engine
954            let supervisor_config = mocks::supervisor::Config {
955                namespace: namespace.clone(),
956                participants: view_validators.clone(),
957            };
958            let mut supervisor =
959                mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
960            supervisors.push(supervisor.clone());
961            let application_cfg = mocks::application::Config {
962                hasher: Sha256::default(),
963                relay: relay.clone(),
964                participant: validator.clone(),
965                propose_latency: (10.0, 5.0),
966                verify_latency: (10.0, 5.0),
967            };
968            let (actor, application) = mocks::application::Application::new(
969                context.with_label("application"),
970                application_cfg,
971            );
972            actor.start();
973            let cfg = config::Config {
974                crypto: scheme,
975                automaton: application.clone(),
976                relay: application.clone(),
977                reporter: supervisor.clone(),
978                supervisor: supervisor.clone(),
979                partition: validator.to_string(),
980                mailbox_size: 1024,
981                namespace: namespace.clone(),
982                leader_timeout: Duration::from_secs(1),
983                notarization_timeout: Duration::from_secs(2),
984                nullify_retry: Duration::from_secs(10),
985                fetch_timeout: Duration::from_secs(1),
986                activity_timeout,
987                skip_timeout,
988                max_fetch_count: 1,
989                max_participants: n as usize,
990                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
991                fetch_concurrent: 1,
992                replay_buffer: NZUsize!(1024 * 1024),
993                write_buffer: NZUsize!(1024 * 1024),
994                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
995            };
996            let (voter, resolver) = registrations
997                .remove(&validator)
998                .expect("validator should be registered");
999            let engine = Engine::new(context.with_label("engine"), cfg);
1000            engine_handlers.push(engine.start(voter, resolver));
1001
1002            // Wait for new engine to finalize required
1003            let (mut latest, mut monitor) = supervisor.subscribe().await;
1004            while latest < required_containers {
1005                latest = monitor.next().await.expect("event missing");
1006            }
1007        });
1008    }
1009
1010    #[test_traced]
1011    fn test_one_offline() {
1012        // Create context
1013        let n = 5;
1014        let threshold = quorum(n);
1015        let required_containers = 100;
1016        let activity_timeout = 10;
1017        let skip_timeout = 5;
1018        let namespace = b"consensus".to_vec();
1019        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1020        executor.start(|context| async move {
1021            // Create simulated network
1022            let (network, mut oracle) = Network::new(
1023                context.with_label("network"),
1024                Config {
1025                    max_size: 1024 * 1024,
1026                },
1027            );
1028
1029            // Start network
1030            network.start();
1031
1032            // Register participants
1033            let mut schemes = Vec::new();
1034            let mut validators = Vec::new();
1035            for i in 0..n {
1036                let scheme = PrivateKey::from_seed(i as u64);
1037                let pk = scheme.public_key();
1038                schemes.push(scheme);
1039                validators.push(pk);
1040            }
1041            validators.sort();
1042            schemes.sort_by_key(|s| s.public_key());
1043            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1044            let mut registrations = register_validators(&mut oracle, &validators).await;
1045
1046            // Link all validators except first
1047            let link = Link {
1048                latency: Duration::from_millis(10),
1049                jitter: Duration::from_millis(1),
1050                success_rate: 1.0,
1051            };
1052            link_validators(
1053                &mut oracle,
1054                &validators,
1055                Action::Link(link),
1056                Some(|_, i, j| ![i, j].contains(&0usize)),
1057            )
1058            .await;
1059
1060            // Create engines
1061            let relay = Arc::new(mocks::relay::Relay::new());
1062            let mut supervisors = Vec::new();
1063            let mut engine_handlers = Vec::new();
1064            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1065                // Skip first peer
1066                if idx_scheme == 0 {
1067                    continue;
1068                }
1069
1070                // Create scheme context
1071                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1072
1073                // Start engine
1074                let validator = scheme.public_key();
1075                let supervisor_config = mocks::supervisor::Config {
1076                    namespace: namespace.clone(),
1077                    participants: view_validators.clone(),
1078                };
1079                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1080                    supervisor_config,
1081                );
1082                supervisors.push(supervisor.clone());
1083                let application_cfg = mocks::application::Config {
1084                    hasher: Sha256::default(),
1085                    relay: relay.clone(),
1086                    participant: validator.clone(),
1087                    propose_latency: (10.0, 5.0),
1088                    verify_latency: (10.0, 5.0),
1089                };
1090                let (actor, application) = mocks::application::Application::new(
1091                    context.with_label("application"),
1092                    application_cfg,
1093                );
1094                actor.start();
1095                let cfg = config::Config {
1096                    crypto: scheme,
1097                    automaton: application.clone(),
1098                    relay: application.clone(),
1099                    reporter: supervisor.clone(),
1100                    partition: validator.to_string(),
1101                    supervisor,
1102                    mailbox_size: 1024,
1103                    namespace: namespace.clone(),
1104                    leader_timeout: Duration::from_secs(1),
1105                    notarization_timeout: Duration::from_secs(2),
1106                    nullify_retry: Duration::from_secs(10),
1107                    fetch_timeout: Duration::from_secs(1),
1108                    activity_timeout,
1109                    skip_timeout,
1110                    max_participants: n as usize,
1111                    max_fetch_count: 1,
1112                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1113                    fetch_concurrent: 1,
1114                    replay_buffer: NZUsize!(1024 * 1024),
1115                    write_buffer: NZUsize!(1024 * 1024),
1116                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1117                };
1118                let (voter, resolver) = registrations
1119                    .remove(&validator)
1120                    .expect("validator should be registered");
1121                let engine = Engine::new(context.with_label("engine"), cfg);
1122                engine_handlers.push(engine.start(voter, resolver));
1123            }
1124
1125            // Wait for all engines to finish
1126            let mut finalizers = Vec::new();
1127            for supervisor in supervisors.iter_mut() {
1128                let (mut latest, mut monitor) = supervisor.subscribe().await;
1129                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1130                    while latest < required_containers {
1131                        latest = monitor.next().await.expect("event missing");
1132                    }
1133                }));
1134            }
1135            join_all(finalizers).await;
1136
1137            // Check supervisors for correct activity
1138            let offline = &validators[0];
1139            for supervisor in supervisors.iter() {
1140                // Ensure no faults
1141                {
1142                    let faults = supervisor.faults.lock().unwrap();
1143                    assert!(faults.is_empty());
1144                }
1145
1146                // Ensure offline node is never active
1147                {
1148                    let notarizes = supervisor.notarizes.lock().unwrap();
1149                    for (view, payloads) in notarizes.iter() {
1150                        for (_, participants) in payloads.iter() {
1151                            if participants.contains(offline) {
1152                                panic!("view: {view}");
1153                            }
1154                        }
1155                    }
1156                }
1157                {
1158                    let nullifies = supervisor.nullifies.lock().unwrap();
1159                    for (view, participants) in nullifies.iter() {
1160                        if participants.contains(offline) {
1161                            panic!("view: {view}");
1162                        }
1163                    }
1164                }
1165                {
1166                    let finalizes = supervisor.finalizes.lock().unwrap();
1167                    for (view, payloads) in finalizes.iter() {
1168                        for (_, finalizers) in payloads.iter() {
1169                            if finalizers.contains(offline) {
1170                                panic!("view: {view}");
1171                            }
1172                        }
1173                    }
1174                }
1175
1176                // Identify offline views
1177                let mut offline_views = Vec::new();
1178                {
1179                    let leaders = supervisor.leaders.lock().unwrap();
1180                    for (view, leader) in leaders.iter() {
1181                        if leader == offline {
1182                            offline_views.push(*view);
1183                        }
1184                    }
1185                }
1186                assert!(!offline_views.is_empty());
1187
1188                // Ensure nullifies/nullification collected for offline node
1189                {
1190                    let nullifies = supervisor.nullifies.lock().unwrap();
1191                    for view in offline_views.iter() {
1192                        let nullifies = nullifies.get(view).unwrap();
1193                        if nullifies.len() < threshold as usize {
1194                            panic!("view: {view}");
1195                        }
1196                    }
1197                }
1198                {
1199                    let nullifications = supervisor.nullifications.lock().unwrap();
1200                    for view in offline_views.iter() {
1201                        nullifications.get(view).unwrap();
1202                    }
1203                }
1204            }
1205
1206            // Ensure we are skipping views
1207            let encoded = context.encode();
1208            let lines = encoded.lines();
1209            let mut skipped_views = 0;
1210            let mut nodes_skipping = 0;
1211            for line in lines {
1212                if line.contains("_engine_voter_skipped_views_total") {
1213                    let parts: Vec<&str> = line.split_whitespace().collect();
1214                    if let Some(number_str) = parts.last() {
1215                        if let Ok(number) = number_str.parse::<u64>() {
1216                            if number > 0 {
1217                                nodes_skipping += 1;
1218                            }
1219                            if number > skipped_views {
1220                                skipped_views = number;
1221                            }
1222                        }
1223                    }
1224                }
1225            }
1226            assert!(
1227                skipped_views > 0,
1228                "expected skipped views to be greater than 0"
1229            );
1230            assert_eq!(
1231                nodes_skipping,
1232                n - 1,
1233                "expected all online nodes to be skipping views"
1234            );
1235        });
1236    }
1237
1238    #[test_traced]
1239    fn test_slow_validator() {
1240        // Create context
1241        let n = 5;
1242        let required_containers = 50;
1243        let activity_timeout = 10;
1244        let skip_timeout = 5;
1245        let namespace = b"consensus".to_vec();
1246        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1247        executor.start(|context| async move {
1248            // Create simulated network
1249            let (network, mut oracle) = Network::new(
1250                context.with_label("network"),
1251                Config {
1252                    max_size: 1024 * 1024,
1253                },
1254            );
1255
1256            // Start network
1257            network.start();
1258
1259            // Register participants
1260            let mut schemes = Vec::new();
1261            let mut validators = Vec::new();
1262            for i in 0..n {
1263                let scheme = PrivateKey::from_seed(i as u64);
1264                let pk = scheme.public_key();
1265                schemes.push(scheme);
1266                validators.push(pk);
1267            }
1268            validators.sort();
1269            schemes.sort_by_key(|s| s.public_key());
1270            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1271            let mut registrations = register_validators(&mut oracle, &validators).await;
1272
1273            // Link all validators
1274            let link = Link {
1275                latency: Duration::from_millis(10),
1276                jitter: Duration::from_millis(1),
1277                success_rate: 1.0,
1278            };
1279            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1280
1281            // Create engines
1282            let relay = Arc::new(mocks::relay::Relay::new());
1283            let mut supervisors = Vec::new();
1284            let mut engine_handlers = Vec::new();
1285            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1286                // Create scheme context
1287                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1288
1289                // Start engine
1290                let validator = scheme.public_key();
1291                let supervisor_config = mocks::supervisor::Config {
1292                    namespace: namespace.clone(),
1293                    participants: view_validators.clone(),
1294                };
1295                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1296                    supervisor_config,
1297                );
1298                supervisors.push(supervisor.clone());
1299                let application_cfg = if idx_scheme == 0 {
1300                    mocks::application::Config {
1301                        hasher: Sha256::default(),
1302                        relay: relay.clone(),
1303                        participant: validator.clone(),
1304                        propose_latency: (3_000.0, 0.0),
1305                        verify_latency: (3_000.0, 5.0),
1306                    }
1307                } else {
1308                    mocks::application::Config {
1309                        hasher: Sha256::default(),
1310                        relay: relay.clone(),
1311                        participant: validator.clone(),
1312                        propose_latency: (10.0, 5.0),
1313                        verify_latency: (10.0, 5.0),
1314                    }
1315                };
1316                let (actor, application) = mocks::application::Application::new(
1317                    context.with_label("application"),
1318                    application_cfg,
1319                );
1320                actor.start();
1321                let cfg = config::Config {
1322                    crypto: scheme,
1323                    automaton: application.clone(),
1324                    relay: application.clone(),
1325                    reporter: supervisor.clone(),
1326                    partition: validator.to_string(),
1327                    supervisor,
1328                    mailbox_size: 1024,
1329                    namespace: namespace.clone(),
1330                    leader_timeout: Duration::from_secs(1),
1331                    notarization_timeout: Duration::from_secs(2),
1332                    nullify_retry: Duration::from_secs(10),
1333                    fetch_timeout: Duration::from_secs(1),
1334                    activity_timeout,
1335                    skip_timeout,
1336                    max_fetch_count: 1,
1337                    max_participants: n as usize,
1338                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1339                    fetch_concurrent: 1,
1340                    replay_buffer: NZUsize!(1024 * 1024),
1341                    write_buffer: NZUsize!(1024 * 1024),
1342                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1343                };
1344                let (voter, resolver) = registrations
1345                    .remove(&validator)
1346                    .expect("validator should be registered");
1347                let engine = Engine::new(context.with_label("engine"), cfg);
1348                engine_handlers.push(engine.start(voter, resolver));
1349            }
1350
1351            // Wait for all engines to finish
1352            let mut finalizers = Vec::new();
1353            for supervisor in supervisors.iter_mut() {
1354                let (mut latest, mut monitor) = supervisor.subscribe().await;
1355                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1356                    while latest < required_containers {
1357                        latest = monitor.next().await.expect("event missing");
1358                    }
1359                }));
1360            }
1361            join_all(finalizers).await;
1362
1363            // Check supervisors for correct activity
1364            let slow = &validators[0];
1365            for supervisor in supervisors.iter() {
1366                // Ensure no faults
1367                {
1368                    let faults = supervisor.faults.lock().unwrap();
1369                    assert!(faults.is_empty());
1370                }
1371
1372                // Ensure slow node is never active (will never process anything fast enough to nullify)
1373                {
1374                    let notarizes = supervisor.notarizes.lock().unwrap();
1375                    for (view, payloads) in notarizes.iter() {
1376                        for (_, participants) in payloads.iter() {
1377                            if participants.contains(slow) {
1378                                panic!("view: {view}");
1379                            }
1380                        }
1381                    }
1382                }
1383                {
1384                    let nullifies = supervisor.nullifies.lock().unwrap();
1385                    for (view, participants) in nullifies.iter() {
1386                        // Start checking once all are online (leader may never have proposed)
1387                        if *view > 10 && participants.contains(slow) {
1388                            panic!("view: {view}");
1389                        }
1390                    }
1391                }
1392                {
1393                    let finalizes = supervisor.finalizes.lock().unwrap();
1394                    for (view, payloads) in finalizes.iter() {
1395                        for (_, finalizers) in payloads.iter() {
1396                            if finalizers.contains(slow) {
1397                                panic!("view: {view}");
1398                            }
1399                        }
1400                    }
1401                }
1402            }
1403        });
1404    }
1405
1406    #[test_traced]
1407    fn test_all_recovery() {
1408        // Create context
1409        let n = 5;
1410        let required_containers = 100;
1411        let activity_timeout = 10;
1412        let skip_timeout = 3;
1413        let namespace = b"consensus".to_vec();
1414        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1415        executor.start(|context| async move {
1416            // Create simulated network
1417            let (network, mut oracle) = Network::new(
1418                context.with_label("network"),
1419                Config {
1420                    max_size: 1024 * 1024,
1421                },
1422            );
1423
1424            // Start network
1425            network.start();
1426
1427            // Register participants
1428            let mut schemes = Vec::new();
1429            let mut validators = Vec::new();
1430            for i in 0..n {
1431                let scheme = PrivateKey::from_seed(i as u64);
1432                let pk = scheme.public_key();
1433                schemes.push(scheme);
1434                validators.push(pk);
1435            }
1436            validators.sort();
1437            schemes.sort_by_key(|s| s.public_key());
1438            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1439            let mut registrations = register_validators(&mut oracle, &validators).await;
1440
1441            // Link all validators
1442            let link = Link {
1443                latency: Duration::from_secs(3),
1444                jitter: Duration::from_millis(0),
1445                success_rate: 1.0,
1446            };
1447            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1448
1449            // Create engines
1450            let relay = Arc::new(mocks::relay::Relay::new());
1451            let mut supervisors = Vec::new();
1452            let mut engine_handlers = Vec::new();
1453            for scheme in schemes.iter() {
1454                // Create scheme context
1455                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1456
1457                // Start engine
1458                let validator = scheme.public_key();
1459                let supervisor_config = mocks::supervisor::Config {
1460                    namespace: namespace.clone(),
1461                    participants: view_validators.clone(),
1462                };
1463                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1464                    supervisor_config,
1465                );
1466                supervisors.push(supervisor.clone());
1467                let application_cfg = mocks::application::Config {
1468                    hasher: Sha256::default(),
1469                    relay: relay.clone(),
1470                    participant: validator.clone(),
1471                    propose_latency: (10.0, 5.0),
1472                    verify_latency: (10.0, 5.0),
1473                };
1474                let (actor, application) = mocks::application::Application::new(
1475                    context.with_label("application"),
1476                    application_cfg,
1477                );
1478                actor.start();
1479                let cfg = config::Config {
1480                    crypto: scheme.clone(),
1481                    automaton: application.clone(),
1482                    relay: application.clone(),
1483                    reporter: supervisor.clone(),
1484                    partition: validator.to_string(),
1485                    supervisor,
1486                    mailbox_size: 1024,
1487                    namespace: namespace.clone(),
1488                    leader_timeout: Duration::from_secs(1),
1489                    notarization_timeout: Duration::from_secs(2),
1490                    nullify_retry: Duration::from_secs(10),
1491                    fetch_timeout: Duration::from_secs(1),
1492                    activity_timeout,
1493                    skip_timeout,
1494                    max_fetch_count: 1,
1495                    max_participants: n as usize,
1496                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1497                    fetch_concurrent: 1,
1498                    replay_buffer: NZUsize!(1024 * 1024),
1499                    write_buffer: NZUsize!(1024 * 1024),
1500                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1501                };
1502                let (voter, resolver) = registrations
1503                    .remove(&validator)
1504                    .expect("validator should be registered");
1505                let engine = Engine::new(context.with_label("engine"), cfg);
1506                engine_handlers.push(engine.start(voter, resolver));
1507            }
1508
1509            // Wait for a few virtual minutes (shouldn't finalize anything)
1510            let mut finalizers = Vec::new();
1511            for supervisor in supervisors.iter_mut() {
1512                let (_, mut monitor) = supervisor.subscribe().await;
1513                finalizers.push(
1514                    context
1515                        .with_label("finalizer")
1516                        .spawn(move |context| async move {
1517                            select! {
1518                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1519                                _done = monitor.next() => {
1520                                    panic!("engine should not notarize or finalize anything");
1521                                }
1522                            }
1523                        }),
1524                );
1525            }
1526            join_all(finalizers).await;
1527
1528            // Unlink all validators to get latest view
1529            link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1530
1531            // Wait for a virtual minute (nothing should happen)
1532            context.sleep(Duration::from_secs(60)).await;
1533
1534            // Get latest view
1535            let mut latest = 0;
1536            for supervisor in supervisors.iter() {
1537                let nullifies = supervisor.nullifies.lock().unwrap();
1538                let max = nullifies.keys().max().unwrap();
1539                if *max > latest {
1540                    latest = *max;
1541                }
1542            }
1543
1544            // Update links
1545            let link = Link {
1546                latency: Duration::from_millis(10),
1547                jitter: Duration::from_millis(1),
1548                success_rate: 1.0,
1549            };
1550            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1551
1552            // Wait for all engines to finish
1553            let mut finalizers = Vec::new();
1554            for supervisor in supervisors.iter_mut() {
1555                let (mut latest, mut monitor) = supervisor.subscribe().await;
1556                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1557                    while latest < required_containers {
1558                        latest = monitor.next().await.expect("event missing");
1559                    }
1560                }));
1561            }
1562            join_all(finalizers).await;
1563
1564            // Check supervisors for correct activity
1565            for supervisor in supervisors.iter() {
1566                // Ensure no faults
1567                {
1568                    let faults = supervisor.faults.lock().unwrap();
1569                    assert!(faults.is_empty());
1570                }
1571
1572                // Ensure quick recovery.
1573                //
1574                // If the skip timeout isn't implemented correctly, we may go many views before participants
1575                // start to consider a validator's proposal.
1576                {
1577                    // Ensure nearly all views around latest finalize
1578                    let mut found = 0;
1579                    let finalizations = supervisor.finalizations.lock().unwrap();
1580                    for i in latest..latest + activity_timeout {
1581                        if finalizations.contains_key(&i) {
1582                            found += 1;
1583                        }
1584                    }
1585                    assert!(found >= activity_timeout - 2, "found: {found}");
1586                }
1587            }
1588        });
1589    }
1590
1591    #[test_traced]
1592    #[ignore]
1593    fn test_partition() {
1594        // Create context
1595        let n = 10;
1596        let required_containers = 50;
1597        let activity_timeout = 10;
1598        let skip_timeout = 5;
1599        let namespace = b"consensus".to_vec();
1600        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1601        executor.start(|context| async move {
1602            // Create simulated network
1603            let (network, mut oracle) = Network::new(
1604                context.with_label("network"),
1605                Config {
1606                    max_size: 1024 * 1024,
1607                },
1608            );
1609
1610            // Start network
1611            network.start();
1612
1613            // Register participants
1614            let mut schemes = Vec::new();
1615            let mut validators = Vec::new();
1616            for i in 0..n {
1617                let scheme = PrivateKey::from_seed(i as u64);
1618                let pk = scheme.public_key();
1619                schemes.push(scheme);
1620                validators.push(pk);
1621            }
1622            validators.sort();
1623            schemes.sort_by_key(|s| s.public_key());
1624            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1625            let mut registrations = register_validators(&mut oracle, &validators).await;
1626
1627            // Link all validators
1628            let link = Link {
1629                latency: Duration::from_millis(10),
1630                jitter: Duration::from_millis(1),
1631                success_rate: 1.0,
1632            };
1633            link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1634
1635            // Create engines
1636            let relay = Arc::new(mocks::relay::Relay::new());
1637            let mut supervisors = Vec::new();
1638            let mut engine_handlers = Vec::new();
1639            for scheme in schemes.iter() {
1640                // Start engine
1641                let validator = scheme.public_key();
1642                let supervisor_config = mocks::supervisor::Config {
1643                    namespace: namespace.clone(),
1644                    participants: view_validators.clone(),
1645                };
1646                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1647                    supervisor_config,
1648                );
1649                supervisors.push(supervisor.clone());
1650                let application_cfg = mocks::application::Config {
1651                    hasher: Sha256::default(),
1652                    relay: relay.clone(),
1653                    participant: validator.clone(),
1654                    propose_latency: (10.0, 5.0),
1655                    verify_latency: (10.0, 5.0),
1656                };
1657                let (actor, application) = mocks::application::Application::new(
1658                    context.with_label("application"),
1659                    application_cfg,
1660                );
1661                actor.start();
1662                let cfg = config::Config {
1663                    crypto: scheme.clone(),
1664                    automaton: application.clone(),
1665                    relay: application.clone(),
1666                    reporter: supervisor.clone(),
1667                    partition: validator.to_string(),
1668                    supervisor,
1669                    mailbox_size: 1024,
1670                    namespace: namespace.clone(),
1671                    leader_timeout: Duration::from_secs(1),
1672                    notarization_timeout: Duration::from_secs(2),
1673                    nullify_retry: Duration::from_secs(10),
1674                    fetch_timeout: Duration::from_secs(1),
1675                    activity_timeout,
1676                    skip_timeout,
1677                    max_fetch_count: 1,
1678                    max_participants: n as usize,
1679                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1680                    fetch_concurrent: 1,
1681                    replay_buffer: NZUsize!(1024 * 1024),
1682                    write_buffer: NZUsize!(1024 * 1024),
1683                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1684                };
1685                let (voter, resolver) = registrations
1686                    .remove(&validator)
1687                    .expect("validator should be registered");
1688                let engine = Engine::new(context.with_label("engine"), cfg);
1689                engine_handlers.push(engine.start(voter, resolver));
1690            }
1691
1692            // Wait for all engines to finish
1693            let mut finalizers = Vec::new();
1694            for supervisor in supervisors.iter_mut() {
1695                let (mut latest, mut monitor) = supervisor.subscribe().await;
1696                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1697                    while latest < required_containers {
1698                        latest = monitor.next().await.expect("event missing");
1699                    }
1700                }));
1701            }
1702            join_all(finalizers).await;
1703
1704            // Cut all links between validator halves
1705            fn separated(n: usize, a: usize, b: usize) -> bool {
1706                let m = n / 2;
1707                (a < m && b >= m) || (a >= m && b < m)
1708            }
1709            link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1710
1711            // Wait for any in-progress notarizations/finalizations to finish
1712            context.sleep(Duration::from_secs(10)).await;
1713
1714            // Wait for a few virtual minutes (shouldn't finalize anything)
1715            let mut finalizers = Vec::new();
1716            for supervisor in supervisors.iter_mut() {
1717                let (_, mut monitor) = supervisor.subscribe().await;
1718                finalizers.push(
1719                    context
1720                        .with_label("finalizer")
1721                        .spawn(move |context| async move {
1722                            select! {
1723                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1724                                _done = monitor.next() => {
1725                                    panic!("engine should not notarize or finalize anything");
1726                                }
1727                            }
1728                        }),
1729                );
1730            }
1731            join_all(finalizers).await;
1732
1733            // Restore links
1734            link_validators(
1735                &mut oracle,
1736                &validators,
1737                Action::Link(link),
1738                Some(separated),
1739            )
1740            .await;
1741
1742            // Wait for all engines to finish
1743            let mut finalizers = Vec::new();
1744            for supervisor in supervisors.iter_mut() {
1745                let (mut latest, mut monitor) = supervisor.subscribe().await;
1746                let required = latest + required_containers;
1747                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1748                    while latest < required {
1749                        latest = monitor.next().await.expect("event missing");
1750                    }
1751                }));
1752            }
1753            join_all(finalizers).await;
1754
1755            // Check supervisors for correct activity
1756            for supervisor in supervisors.iter() {
1757                // Ensure no faults
1758                {
1759                    let faults = supervisor.faults.lock().unwrap();
1760                    assert!(faults.is_empty());
1761                }
1762            }
1763        });
1764    }
1765
1766    fn slow_and_lossy_links(seed: u64) -> String {
1767        // Create context
1768        let n = 5;
1769        let required_containers = 50;
1770        let activity_timeout = 10;
1771        let skip_timeout = 5;
1772        let namespace = b"consensus".to_vec();
1773        let cfg = deterministic::Config::new()
1774            .with_seed(seed)
1775            .with_timeout(Some(Duration::from_secs(5_000)));
1776        let executor = deterministic::Runner::new(cfg);
1777        executor.start(|context| async move {
1778            // Create simulated network
1779            let (network, mut oracle) = Network::new(
1780                context.with_label("network"),
1781                Config {
1782                    max_size: 1024 * 1024,
1783                },
1784            );
1785
1786            // Start network
1787            network.start();
1788
1789            // Register participants
1790            let mut schemes = Vec::new();
1791            let mut validators = Vec::new();
1792            for i in 0..n {
1793                let scheme = PrivateKey::from_seed(i as u64);
1794                let pk = scheme.public_key();
1795                schemes.push(scheme);
1796                validators.push(pk);
1797            }
1798            validators.sort();
1799            schemes.sort_by_key(|s| s.public_key());
1800            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1801            let mut registrations = register_validators(&mut oracle, &validators).await;
1802
1803            // Link all validators
1804            let degraded_link = Link {
1805                latency: Duration::from_millis(200),
1806                jitter: Duration::from_millis(150),
1807                success_rate: 0.5,
1808            };
1809            link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1810
1811            // Create engines
1812            let relay = Arc::new(mocks::relay::Relay::new());
1813            let mut supervisors = Vec::new();
1814            let mut engine_handlers = Vec::new();
1815            for scheme in schemes.into_iter() {
1816                // Create scheme context
1817                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1818
1819                // Start engine
1820                let validator = scheme.public_key();
1821                let supervisor_config = mocks::supervisor::Config {
1822                    namespace: namespace.clone(),
1823                    participants: view_validators.clone(),
1824                };
1825                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1826                    supervisor_config,
1827                );
1828                supervisors.push(supervisor.clone());
1829                let application_cfg = mocks::application::Config {
1830                    hasher: Sha256::default(),
1831                    relay: relay.clone(),
1832                    participant: validator.clone(),
1833                    propose_latency: (10.0, 5.0),
1834                    verify_latency: (10.0, 5.0),
1835                };
1836                let (actor, application) = mocks::application::Application::new(
1837                    context.with_label("application"),
1838                    application_cfg,
1839                );
1840                actor.start();
1841                let cfg = config::Config {
1842                    crypto: scheme,
1843                    automaton: application.clone(),
1844                    relay: application.clone(),
1845                    reporter: supervisor.clone(),
1846                    partition: validator.to_string(),
1847                    supervisor,
1848                    mailbox_size: 1024,
1849                    namespace: namespace.clone(),
1850                    leader_timeout: Duration::from_secs(1),
1851                    notarization_timeout: Duration::from_secs(2),
1852                    nullify_retry: Duration::from_secs(10),
1853                    fetch_timeout: Duration::from_secs(1),
1854                    activity_timeout,
1855                    skip_timeout,
1856                    max_fetch_count: 1,
1857                    max_participants: n as usize,
1858                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1859                    fetch_concurrent: 1,
1860                    replay_buffer: NZUsize!(1024 * 1024),
1861                    write_buffer: NZUsize!(1024 * 1024),
1862                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863                };
1864                let (voter, resolver) = registrations
1865                    .remove(&validator)
1866                    .expect("validator should be registered");
1867                let engine = Engine::new(context.with_label("engine"), cfg);
1868                engine_handlers.push(engine.start(voter, resolver));
1869            }
1870
1871            // Wait for all engines to finish
1872            let mut finalizers = Vec::new();
1873            for supervisor in supervisors.iter_mut() {
1874                let (mut latest, mut monitor) = supervisor.subscribe().await;
1875                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1876                    while latest < required_containers {
1877                        latest = monitor.next().await.expect("event missing");
1878                    }
1879                }));
1880            }
1881            join_all(finalizers).await;
1882
1883            // Check supervisors for correct activity
1884            for supervisor in supervisors.iter() {
1885                // Ensure no faults
1886                {
1887                    let faults = supervisor.faults.lock().unwrap();
1888                    assert!(faults.is_empty());
1889                }
1890            }
1891
1892            context.auditor().state()
1893        })
1894    }
1895
1896    #[test_traced]
1897    fn test_slow_and_lossy_links() {
1898        slow_and_lossy_links(0);
1899    }
1900
1901    #[test_traced]
1902    #[ignore]
1903    fn test_determinism() {
1904        // We use slow and lossy links as the deterministic test
1905        // because it is the most complex test.
1906        for seed in 1..6 {
1907            // Run test with seed
1908            let state_1 = slow_and_lossy_links(seed);
1909
1910            // Run test again with same seed
1911            let state_2 = slow_and_lossy_links(seed);
1912
1913            // Ensure states are equal
1914            assert_eq!(state_1, state_2);
1915        }
1916    }
1917
1918    fn conflicter(seed: u64) {
1919        // Create context
1920        let n = 4;
1921        let required_containers = 50;
1922        let activity_timeout = 10;
1923        let skip_timeout = 5;
1924        let namespace = b"consensus".to_vec();
1925        let cfg = deterministic::Config::new()
1926            .with_seed(seed)
1927            .with_timeout(Some(Duration::from_secs(30)));
1928        let executor = deterministic::Runner::new(cfg);
1929        executor.start(|context| async move {
1930            // Create simulated network
1931            let (network, mut oracle) = Network::new(
1932                context.with_label("network"),
1933                Config {
1934                    max_size: 1024 * 1024,
1935                },
1936            );
1937
1938            // Start network
1939            network.start();
1940
1941            // Register participants
1942            let mut schemes = Vec::new();
1943            let mut validators = Vec::new();
1944            for i in 0..n {
1945                let scheme = PrivateKey::from_seed(i as u64);
1946                let pk = scheme.public_key();
1947                schemes.push(scheme);
1948                validators.push(pk);
1949            }
1950            validators.sort();
1951            schemes.sort_by_key(|s| s.public_key());
1952            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1953            let mut registrations = register_validators(&mut oracle, &validators).await;
1954
1955            // Link all validators
1956            let link = Link {
1957                latency: Duration::from_millis(10),
1958                jitter: Duration::from_millis(1),
1959                success_rate: 1.0,
1960            };
1961            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1962
1963            // Create engines
1964            let relay = Arc::new(mocks::relay::Relay::new());
1965            let mut supervisors = Vec::new();
1966            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1967                // Create scheme context
1968                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1969
1970                // Start engine
1971                let validator = scheme.public_key();
1972                let supervisor_config = mocks::supervisor::Config {
1973                    namespace: namespace.clone(),
1974                    participants: view_validators.clone(),
1975                };
1976                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1977                    supervisor_config,
1978                );
1979                if idx_scheme == 0 {
1980                    let cfg = mocks::conflicter::Config {
1981                        crypto: scheme,
1982                        supervisor,
1983                        namespace: namespace.clone(),
1984                    };
1985                    let (voter, _) = registrations
1986                        .remove(&validator)
1987                        .expect("validator should be registered");
1988                    let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1989                        mocks::conflicter::Conflicter::new(
1990                            context.with_label("byzantine_engine"),
1991                            cfg,
1992                        );
1993                    engine.start(voter);
1994                } else {
1995                    supervisors.push(supervisor.clone());
1996                    let application_cfg = mocks::application::Config {
1997                        hasher: Sha256::default(),
1998                        relay: relay.clone(),
1999                        participant: validator.clone(),
2000                        propose_latency: (10.0, 5.0),
2001                        verify_latency: (10.0, 5.0),
2002                    };
2003                    let (actor, application) = mocks::application::Application::new(
2004                        context.with_label("application"),
2005                        application_cfg,
2006                    );
2007                    actor.start();
2008                    let cfg = config::Config {
2009                        crypto: scheme,
2010                        automaton: application.clone(),
2011                        relay: application.clone(),
2012                        reporter: supervisor.clone(),
2013                        partition: validator.to_string(),
2014                        supervisor,
2015                        mailbox_size: 1024,
2016                        namespace: namespace.clone(),
2017                        leader_timeout: Duration::from_secs(1),
2018                        notarization_timeout: Duration::from_secs(2),
2019                        nullify_retry: Duration::from_secs(10),
2020                        fetch_timeout: Duration::from_secs(1),
2021                        activity_timeout,
2022                        skip_timeout,
2023                        max_fetch_count: 1,
2024                        max_participants: n as usize,
2025                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2026                        fetch_concurrent: 1,
2027                        replay_buffer: NZUsize!(1024 * 1024),
2028                        write_buffer: NZUsize!(1024 * 1024),
2029                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2030                    };
2031                    let (voter, resolver) = registrations
2032                        .remove(&validator)
2033                        .expect("validator should be registered");
2034                    let engine = Engine::new(context.with_label("engine"), cfg);
2035                    engine.start(voter, resolver);
2036                }
2037            }
2038
2039            // Wait for all engines to finish
2040            let mut finalizers = Vec::new();
2041            for supervisor in supervisors.iter_mut() {
2042                let (mut latest, mut monitor) = supervisor.subscribe().await;
2043                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2044                    while latest < required_containers {
2045                        latest = monitor.next().await.expect("event missing");
2046                    }
2047                }));
2048            }
2049            join_all(finalizers).await;
2050
2051            // Check supervisors for correct activity
2052            let byz = &validators[0];
2053            let mut count_conflicting_notarize = 0;
2054            let mut count_conflicting_finalize = 0;
2055            for supervisor in supervisors.iter() {
2056                // Ensure only faults for byz
2057                {
2058                    let faults = supervisor.faults.lock().unwrap();
2059                    assert_eq!(faults.len(), 1);
2060                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2061                    for (_, faults) in faulter.iter() {
2062                        for fault in faults.iter() {
2063                            match fault {
2064                                Activity::ConflictingNotarize(_) => {
2065                                    count_conflicting_notarize += 1;
2066                                }
2067                                Activity::ConflictingFinalize(_) => {
2068                                    count_conflicting_finalize += 1;
2069                                }
2070                                _ => panic!("unexpected fault: {fault:?}"),
2071                            }
2072                        }
2073                    }
2074                }
2075            }
2076            assert!(count_conflicting_notarize > 0);
2077            assert!(count_conflicting_finalize > 0);
2078        });
2079    }
2080
2081    #[test_traced]
2082    #[ignore]
2083    fn test_conflicter() {
2084        for seed in 0..5 {
2085            conflicter(seed);
2086        }
2087    }
2088
2089    fn nuller(seed: u64) {
2090        // Create context
2091        let n = 4;
2092        let required_containers = 50;
2093        let activity_timeout = 10;
2094        let skip_timeout = 5;
2095        let namespace = b"consensus".to_vec();
2096        let cfg = deterministic::Config::new()
2097            .with_seed(seed)
2098            .with_timeout(Some(Duration::from_secs(30)));
2099        let executor = deterministic::Runner::new(cfg);
2100        executor.start(|context| async move {
2101            // Create simulated network
2102            let (network, mut oracle) = Network::new(
2103                context.with_label("network"),
2104                Config {
2105                    max_size: 1024 * 1024,
2106                },
2107            );
2108
2109            // Start network
2110            network.start();
2111
2112            // Register participants
2113            let mut schemes = Vec::new();
2114            let mut validators = Vec::new();
2115            for i in 0..n {
2116                let scheme = PrivateKey::from_seed(i as u64);
2117                let pk = scheme.public_key();
2118                schemes.push(scheme);
2119                validators.push(pk);
2120            }
2121            validators.sort();
2122            schemes.sort_by_key(|s| s.public_key());
2123            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2124            let mut registrations = register_validators(&mut oracle, &validators).await;
2125
2126            // Link all validators
2127            let link = Link {
2128                latency: Duration::from_millis(10),
2129                jitter: Duration::from_millis(1),
2130                success_rate: 1.0,
2131            };
2132            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2133
2134            // Create engines
2135            let relay = Arc::new(mocks::relay::Relay::new());
2136            let mut supervisors = Vec::new();
2137            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2138                // Create scheme context
2139                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2140
2141                // Start engine
2142                let validator = scheme.public_key();
2143                let supervisor_config = mocks::supervisor::Config {
2144                    namespace: namespace.clone(),
2145                    participants: view_validators.clone(),
2146                };
2147                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2148                    supervisor_config,
2149                );
2150                if idx_scheme == 0 {
2151                    let cfg = mocks::nuller::Config {
2152                        crypto: scheme,
2153                        supervisor,
2154                        namespace: namespace.clone(),
2155                    };
2156                    let (voter, _) = registrations
2157                        .remove(&validator)
2158                        .expect("validator should be registered");
2159                    let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2160                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2161                    engine.start(voter);
2162                } else {
2163                    supervisors.push(supervisor.clone());
2164                    let application_cfg = mocks::application::Config {
2165                        hasher: Sha256::default(),
2166                        relay: relay.clone(),
2167                        participant: validator.clone(),
2168                        propose_latency: (10.0, 5.0),
2169                        verify_latency: (10.0, 5.0),
2170                    };
2171                    let (actor, application) = mocks::application::Application::new(
2172                        context.with_label("application"),
2173                        application_cfg,
2174                    );
2175                    actor.start();
2176                    let cfg = config::Config {
2177                        crypto: scheme,
2178                        automaton: application.clone(),
2179                        relay: application.clone(),
2180                        reporter: supervisor.clone(),
2181                        partition: validator.to_string(),
2182                        supervisor,
2183                        mailbox_size: 1024,
2184                        namespace: namespace.clone(),
2185                        leader_timeout: Duration::from_secs(1),
2186                        notarization_timeout: Duration::from_secs(2),
2187                        nullify_retry: Duration::from_secs(10),
2188                        fetch_timeout: Duration::from_secs(1),
2189                        activity_timeout,
2190                        skip_timeout,
2191                        max_fetch_count: 1,
2192                        max_participants: n as usize,
2193                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2194                        fetch_concurrent: 1,
2195                        replay_buffer: NZUsize!(1024 * 1024),
2196                        write_buffer: NZUsize!(1024 * 1024),
2197                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2198                    };
2199                    let (voter, resolver) = registrations
2200                        .remove(&validator)
2201                        .expect("validator should be registered");
2202                    let engine = Engine::new(context.with_label("engine"), cfg);
2203                    engine.start(voter, resolver);
2204                }
2205            }
2206
2207            // Wait for all engines to finish
2208            let mut finalizers = Vec::new();
2209            for supervisor in supervisors.iter_mut() {
2210                let (mut latest, mut monitor) = supervisor.subscribe().await;
2211                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2212                    while latest < required_containers {
2213                        latest = monitor.next().await.expect("event missing");
2214                    }
2215                }));
2216            }
2217            join_all(finalizers).await;
2218
2219            // Check supervisors for correct activity
2220            let byz = &validators[0];
2221            let mut count_nullify_and_finalize = 0;
2222            for supervisor in supervisors.iter() {
2223                // Ensure only faults for byz
2224                {
2225                    let faults = supervisor.faults.lock().unwrap();
2226                    assert_eq!(faults.len(), 1);
2227                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2228                    for (_, faults) in faulter.iter() {
2229                        for fault in faults.iter() {
2230                            match fault {
2231                                Activity::NullifyFinalize(_) => {
2232                                    count_nullify_and_finalize += 1;
2233                                }
2234                                _ => panic!("unexpected fault: {fault:?}"),
2235                            }
2236                        }
2237                    }
2238                }
2239            }
2240            assert!(count_nullify_and_finalize > 0);
2241        });
2242    }
2243
2244    #[test_traced]
2245    #[ignore]
2246    fn test_nuller() {
2247        for seed in 0..5 {
2248            nuller(seed);
2249        }
2250    }
2251
2252    fn outdated(seed: u64) {
2253        // Create context
2254        let n = 4;
2255        let required_containers = 100;
2256        let activity_timeout = 10;
2257        let skip_timeout = 5;
2258        let namespace = b"consensus".to_vec();
2259        let cfg = deterministic::Config::new()
2260            .with_seed(seed)
2261            .with_timeout(Some(Duration::from_secs(30)));
2262        let executor = deterministic::Runner::new(cfg);
2263        executor.start(|context| async move {
2264            // Create simulated network
2265            let (network, mut oracle) = Network::new(
2266                context.with_label("network"),
2267                Config {
2268                    max_size: 1024 * 1024,
2269                },
2270            );
2271
2272            // Start network
2273            network.start();
2274
2275            // Register participants
2276            let mut schemes = Vec::new();
2277            let mut validators = Vec::new();
2278            for i in 0..n {
2279                let scheme = PrivateKey::from_seed(i as u64);
2280                let pk = scheme.public_key();
2281                schemes.push(scheme);
2282                validators.push(pk);
2283            }
2284            validators.sort();
2285            schemes.sort_by_key(|s| s.public_key());
2286            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2287            let mut registrations = register_validators(&mut oracle, &validators).await;
2288
2289            // Link all validators
2290            let link = Link {
2291                latency: Duration::from_millis(10),
2292                jitter: Duration::from_millis(1),
2293                success_rate: 1.0,
2294            };
2295            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2296
2297            // Create engines
2298            let relay = Arc::new(mocks::relay::Relay::new());
2299            let mut supervisors = Vec::new();
2300            for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2301                // Create scheme context
2302                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2303
2304                // Start engine
2305                let validator = scheme.public_key();
2306                let supervisor_config = mocks::supervisor::Config {
2307                    namespace: namespace.clone(),
2308                    participants: view_validators.clone(),
2309                };
2310                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2311                    supervisor_config,
2312                );
2313                if idx_scheme == 0 {
2314                    let cfg = mocks::outdated::Config {
2315                        crypto: scheme,
2316                        supervisor,
2317                        namespace: namespace.clone(),
2318                        view_delta: activity_timeout * 4,
2319                    };
2320                    let (voter, _) = registrations
2321                        .remove(&validator)
2322                        .expect("validator should be registered");
2323                    let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2324                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2325                    engine.start(voter);
2326                } else {
2327                    supervisors.push(supervisor.clone());
2328                    let application_cfg = mocks::application::Config {
2329                        hasher: Sha256::default(),
2330                        relay: relay.clone(),
2331                        participant: validator.clone(),
2332                        propose_latency: (10.0, 5.0),
2333                        verify_latency: (10.0, 5.0),
2334                    };
2335                    let (actor, application) = mocks::application::Application::new(
2336                        context.with_label("application"),
2337                        application_cfg,
2338                    );
2339                    actor.start();
2340                    let cfg = config::Config {
2341                        crypto: scheme,
2342                        automaton: application.clone(),
2343                        relay: application.clone(),
2344                        reporter: supervisor.clone(),
2345                        supervisor,
2346                        partition: validator.to_string(),
2347                        mailbox_size: 1024,
2348                        namespace: namespace.clone(),
2349                        leader_timeout: Duration::from_secs(1),
2350                        notarization_timeout: Duration::from_secs(2),
2351                        nullify_retry: Duration::from_secs(10),
2352                        fetch_timeout: Duration::from_secs(1),
2353                        activity_timeout,
2354                        skip_timeout,
2355                        max_fetch_count: 1,
2356                        max_participants: n as usize,
2357                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2358                        fetch_concurrent: 1,
2359                        replay_buffer: NZUsize!(1024 * 1024),
2360                        write_buffer: NZUsize!(1024 * 1024),
2361                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2362                    };
2363                    let (voter, resolver) = registrations
2364                        .remove(&validator)
2365                        .expect("validator should be registered");
2366                    let engine = Engine::new(context.with_label("engine"), cfg);
2367                    engine.start(voter, resolver);
2368                }
2369            }
2370
2371            // Wait for all engines to finish
2372            let mut finalizers = Vec::new();
2373            for supervisor in supervisors.iter_mut() {
2374                let (mut latest, mut monitor) = supervisor.subscribe().await;
2375                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2376                    while latest < required_containers {
2377                        latest = monitor.next().await.expect("event missing");
2378                    }
2379                }));
2380            }
2381            join_all(finalizers).await;
2382
2383            // Ensure no faults
2384            for supervisor in supervisors.iter() {
2385                {
2386                    let faults = supervisor.faults.lock().unwrap();
2387                    assert!(faults.is_empty());
2388                }
2389            }
2390        });
2391    }
2392
2393    #[test_traced]
2394    #[ignore]
2395    fn test_outdated() {
2396        for seed in 0..5 {
2397            outdated(seed);
2398        }
2399    }
2400
2401    #[test_traced]
2402    #[ignore]
2403    fn test_1k() {
2404        // Create context
2405        let n = 10;
2406        let required_containers = 1_000;
2407        let activity_timeout = 10;
2408        let skip_timeout = 5;
2409        let namespace = b"consensus".to_vec();
2410        let cfg = deterministic::Config::new();
2411        let executor = deterministic::Runner::new(cfg);
2412        executor.start(|context| async move {
2413            // Create simulated network
2414            let (network, mut oracle) = Network::new(
2415                context.with_label("network"),
2416                Config {
2417                    max_size: 1024 * 1024,
2418                },
2419            );
2420
2421            // Start network
2422            network.start();
2423
2424            // Register participants
2425            let mut schemes = Vec::new();
2426            let mut validators = Vec::new();
2427            for i in 0..n {
2428                let scheme = PrivateKey::from_seed(i as u64);
2429                let pk = scheme.public_key();
2430                schemes.push(scheme);
2431                validators.push(pk);
2432            }
2433            validators.sort();
2434            schemes.sort_by_key(|s| s.public_key());
2435            let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2436            let mut registrations = register_validators(&mut oracle, &validators).await;
2437
2438            // Link all validators
2439            let link = Link {
2440                latency: Duration::from_millis(80),
2441                jitter: Duration::from_millis(10),
2442                success_rate: 0.98,
2443            };
2444            link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2445
2446            // Create engines
2447            let relay = Arc::new(mocks::relay::Relay::new());
2448            let mut supervisors = Vec::new();
2449            let mut engine_handlers = Vec::new();
2450            for scheme in schemes.into_iter() {
2451                // Create scheme context
2452                let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2453
2454                // Start engine
2455                let validator = scheme.public_key();
2456                let supervisor_config = mocks::supervisor::Config {
2457                    namespace: namespace.clone(),
2458                    participants: view_validators.clone(),
2459                };
2460                let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2461                    supervisor_config,
2462                );
2463                supervisors.push(supervisor.clone());
2464                let application_cfg = mocks::application::Config {
2465                    hasher: Sha256::default(),
2466                    relay: relay.clone(),
2467                    participant: validator.clone(),
2468                    propose_latency: (100.0, 50.0),
2469                    verify_latency: (50.0, 40.0),
2470                };
2471                let (actor, application) = mocks::application::Application::new(
2472                    context.with_label("application"),
2473                    application_cfg,
2474                );
2475                actor.start();
2476                let cfg = config::Config {
2477                    crypto: scheme,
2478                    automaton: application.clone(),
2479                    relay: application.clone(),
2480                    reporter: supervisor.clone(),
2481                    partition: validator.to_string(),
2482                    supervisor,
2483                    mailbox_size: 1024,
2484                    namespace: namespace.clone(),
2485                    leader_timeout: Duration::from_secs(1),
2486                    notarization_timeout: Duration::from_secs(2),
2487                    nullify_retry: Duration::from_secs(10),
2488                    fetch_timeout: Duration::from_secs(1),
2489                    activity_timeout,
2490                    skip_timeout,
2491                    max_fetch_count: 1,
2492                    max_participants: n as usize,
2493                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2494                    fetch_concurrent: 1,
2495                    replay_buffer: NZUsize!(1024 * 1024),
2496                    write_buffer: NZUsize!(1024 * 1024),
2497                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2498                };
2499                let (voter, resolver) = registrations
2500                    .remove(&validator)
2501                    .expect("validator should be registered");
2502                let engine = Engine::new(context.with_label("engine"), cfg);
2503                engine_handlers.push(engine.start(voter, resolver));
2504            }
2505
2506            // Wait for all engines to finish
2507            let mut finalizers = Vec::new();
2508            for supervisor in supervisors.iter_mut() {
2509                let (mut latest, mut monitor) = supervisor.subscribe().await;
2510                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2511                    while latest < required_containers {
2512                        latest = monitor.next().await.expect("event missing");
2513                    }
2514                }));
2515            }
2516            join_all(finalizers).await;
2517
2518            // Check supervisors for correct activity
2519            for supervisor in supervisors.iter() {
2520                // Ensure no faults
2521                {
2522                    let faults = supervisor.faults.lock().unwrap();
2523                    assert!(faults.is_empty());
2524                }
2525            }
2526        })
2527    }
2528}