commonware_consensus/aggregation/
mod.rs

1//! Recover threshold signatures over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce threshold signatures
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect threshold signatures over concurrent, sequencer-driven broadcast,
12//! _check out [crate::ordered_broadcast]._
13//!
14//! # Architecture
15//!
16//! The core of the module is the [Engine]. It manages the agreement process by:
17//! - Requesting externally synchronized [commonware_cryptography::Digest]s
18//! - Signing said digests with BLS [commonware_cryptography::bls12381::primitives::poly::PartialSignature]
19//! - Multicasting partial signatures to other validators
20//! - Recovering [commonware_cryptography::bls12381::primitives::poly::Signature]s from a quorum of partial signatures
21//! - Monitoring recovery progress and notifying the application layer of recoveries
22//!
23//! The engine interacts with four main components:
24//! - [crate::Automaton]: Provides external digests
25//! - [crate::Reporter]: Receives agreement confirmations
26//! - [crate::Monitor]: Tracks epoch transitions
27//! - [crate::ThresholdSupervisor]: Manages validator sets and network identities
28//!
29//! # Design Decisions
30//!
31//! ## Missing Signature Resolution
32//!
33//! The engine does not try to "fill gaps" when threshold signatures are missing. When validators
34//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
35//! signatures may never be emitted by the local engine. Before skipping ahead, we ensure that
36//! at-least-one honest validator has the threshold signature for any skipped index.
37//!
38//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
39//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
40//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
41//! historical results.
42//!
43//! ## Recovering Threshold Signatures
44//!
45//! In aggregation, participants never gossip recovered threshold signatures. Rather, they gossip [types::TipAck]s
46//! with partial signatures over some index and their latest tip. This approach reduces the overhead of running aggregation
47//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
48//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
49//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
50//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
51
52pub mod types;
53
54cfg_if::cfg_if! {
55    if #[cfg(not(target_arch = "wasm32"))] {
56        mod config;
57        pub use config::Config;
58        mod engine;
59        pub use engine::Engine;
60        mod metrics;
61        mod safe_tip;
62
63        #[cfg(test)]
64        pub mod mocks;
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::{mocks, types::Epoch, Config, Engine};
71    use crate::aggregation::mocks::Strategy;
72    use commonware_cryptography::{
73        bls12381::{
74            dkg::ops,
75            primitives::{
76                group::Share,
77                poly,
78                variant::{MinPk, MinSig, Variant},
79            },
80        },
81        ed25519::{PrivateKey, PublicKey},
82        sha256::Digest as Sha256Digest,
83        PrivateKeyExt as _, Signer as _,
84    };
85    use commonware_macros::{select, test_traced};
86    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87    use commonware_runtime::{
88        buffer::PoolRef,
89        deterministic::{self, Context},
90        Clock, Metrics, Runner, Spawner,
91    };
92    use commonware_utils::{NZUsize, NonZeroDuration};
93    use futures::{channel::oneshot, future::join_all};
94    use rand::{rngs::StdRng, Rng, SeedableRng};
95    use std::{
96        collections::{BTreeMap, HashMap},
97        num::NonZeroUsize,
98        sync::{Arc, Mutex},
99        time::Duration,
100    };
101    use tracing::debug;
102
103    type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108    /// Reliable network link configuration for testing.
109    const RELIABLE_LINK: Link = Link {
110        latency: Duration::from_millis(10),
111        jitter: Duration::from_millis(1),
112        success_rate: 1.0,
113    };
114
115    /// Register all participants with the network oracle.
116    async fn register_participants(
117        oracle: &mut Oracle<PublicKey>,
118        participants: &[PublicKey],
119    ) -> Registrations<PublicKey> {
120        let mut registrations = BTreeMap::new();
121        for participant in participants.iter() {
122            let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
123            registrations.insert(participant.clone(), (sender, receiver));
124        }
125        registrations
126    }
127
128    /// Establish network links between all participants.
129    async fn link_participants(
130        oracle: &mut Oracle<PublicKey>,
131        participants: &[PublicKey],
132        link: Link,
133    ) {
134        for v1 in participants.iter() {
135            for v2 in participants.iter() {
136                if v2 == v1 {
137                    continue;
138                }
139                oracle
140                    .add_link(v1.clone(), v2.clone(), link.clone())
141                    .await
142                    .unwrap();
143            }
144        }
145    }
146
147    /// Initialize a simulated network environment.
148    async fn initialize_simulation(
149        context: Context,
150        num_validators: u32,
151        shares_vec: &mut [Share],
152        link: Link,
153    ) -> (
154        Oracle<PublicKey>,
155        Vec<(PublicKey, PrivateKey, Share)>,
156        Vec<PublicKey>,
157        Registrations<PublicKey>,
158    ) {
159        let (network, mut oracle) = Network::new(
160            context.with_label("network"),
161            commonware_p2p::simulated::Config {
162                max_size: 1024 * 1024,
163            },
164        );
165        network.start();
166
167        let mut schemes = (0..num_validators)
168            .map(|i| PrivateKey::from_seed(i as u64))
169            .collect::<Vec<_>>();
170        schemes.sort_by_key(|s| s.public_key());
171        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
172            .iter()
173            .enumerate()
174            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
175            .collect();
176        let pks = validators
177            .iter()
178            .map(|(pk, _, _)| pk.clone())
179            .collect::<Vec<_>>();
180
181        let registrations = register_participants(&mut oracle, &pks).await;
182        link_participants(&mut oracle, &pks, link).await;
183        (oracle, validators, pks, registrations)
184    }
185
186    /// Spawn aggregation engines for all validators.
187    #[allow(clippy::too_many_arguments)]
188    fn spawn_validator_engines<V: Variant>(
189        context: Context,
190        polynomial: poly::Public<V>,
191        validator_pks: &[PublicKey],
192        validators: &[(PublicKey, PrivateKey, Share)],
193        registrations: &mut Registrations<PublicKey>,
194        automatons: &mut BTreeMap<PublicKey, mocks::Application>,
195        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
196        oracle: &mut Oracle<PublicKey>,
197        rebroadcast_timeout: Duration,
198        incorrect: Vec<usize>,
199    ) -> HashMap<PublicKey, mocks::Monitor> {
200        let mut monitors = HashMap::new();
201        let namespace = b"my testing namespace";
202
203        for (i, (validator, _, share)) in validators.iter().enumerate() {
204            let context = context.with_label(&validator.to_string());
205            let monitor = mocks::Monitor::new(111);
206            monitors.insert(validator.clone(), monitor.clone());
207            let supervisor = {
208                let identity = *poly::public::<V>(&polynomial);
209                let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
210                s.add_epoch(
211                    111,
212                    share.clone(),
213                    polynomial.clone(),
214                    validator_pks.to_vec(),
215                );
216                s
217            };
218
219            let blocker = oracle.control(validator.clone());
220
221            let automaton = mocks::Application::new(if incorrect.contains(&i) {
222                Strategy::Incorrect
223            } else {
224                Strategy::Correct
225            });
226            automatons.insert(validator.clone(), automaton.clone());
227
228            let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
229                namespace,
230                validator_pks.len() as u32,
231                polynomial.clone(),
232            );
233            context.with_label("reporter").spawn(|_| reporter.run());
234            reporters.insert(validator.clone(), reporter_mailbox);
235
236            let engine = Engine::new(
237                context.with_label("engine"),
238                Config {
239                    monitor,
240                    validators: supervisor,
241                    automaton: automaton.clone(),
242                    reporter: reporters.get(validator).unwrap().clone(),
243                    blocker,
244                    namespace: namespace.to_vec(),
245                    priority_acks: false,
246                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247                    epoch_bounds: (1, 1),
248                    window: std::num::NonZeroU64::new(10).unwrap(),
249                    activity_timeout: 100,
250                    journal_partition: format!("aggregation/{validator}"),
251                    journal_write_buffer: NZUsize!(4096),
252                    journal_replay_buffer: NZUsize!(4096),
253                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254                    journal_compression: Some(3),
255                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
256                },
257            );
258
259            let (sender, receiver) = registrations.remove(validator).unwrap();
260            engine.start((sender, receiver));
261        }
262        monitors
263    }
264
265    /// Wait for all reporters to reach the specified consensus threshold.
266    async fn await_reporters<V: Variant>(
267        context: Context,
268        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
269        threshold_index: u64,
270        threshold_epoch: Epoch,
271    ) {
272        let mut receivers = Vec::new();
273        for (reporter, mailbox) in reporters.iter() {
274            // Create a oneshot channel to signal when the reporter has reached the threshold.
275            let (tx, rx) = oneshot::channel();
276            receivers.push(rx);
277
278            context.with_label("reporter_watcher").spawn({
279                let reporter = reporter.clone();
280                let mut mailbox = mailbox.clone();
281                move |context| async move {
282                    loop {
283                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
284                        debug!(
285                            index,
286                            epoch,
287                            threshold_index,
288                            threshold_epoch,
289                            ?reporter,
290                            "reporter status"
291                        );
292                        if index >= threshold_index && epoch >= threshold_epoch {
293                            debug!(
294                                ?reporter,
295                                "reporter reached threshold, signaling completion"
296                            );
297                            let _ = tx.send(reporter.clone());
298                            break;
299                        }
300                        context.sleep(Duration::from_millis(100)).await;
301                    }
302                }
303            });
304        }
305
306        // Wait for all oneshot receivers to complete.
307        let results = join_all(receivers).await;
308        assert_eq!(results.len(), reporters.len());
309
310        // Check that none were cancelled.
311        for result in results {
312            assert!(result.is_ok(), "reporter was cancelled");
313        }
314    }
315
316    /// Test aggregation consensus with all validators online.
317    fn all_online<V: Variant>() {
318        let num_validators: u32 = 4;
319        let quorum: u32 = 3;
320        let runner = deterministic::Runner::timed(Duration::from_secs(30));
321
322        runner.start(|mut context| async move {
323            let (polynomial, mut shares_vec) =
324                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
325            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
326
327            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
328                context.with_label("simulation"),
329                num_validators,
330                &mut shares_vec,
331                RELIABLE_LINK,
332            )
333            .await;
334            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
335            let mut reporters =
336                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
337            spawn_validator_engines::<V>(
338                context.with_label("validator"),
339                polynomial.clone(),
340                &pks,
341                &validators,
342                &mut registrations,
343                &mut automatons.lock().unwrap(),
344                &mut reporters,
345                &mut oracle,
346                Duration::from_secs(5),
347                vec![],
348            );
349            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
350        });
351    }
352
353    #[test_traced("INFO")]
354    fn test_all_online() {
355        all_online::<MinPk>();
356        all_online::<MinSig>();
357    }
358
359    /// Test consensus resilience to Byzantine behavior.
360    fn byzantine_proposer<V: Variant>() {
361        let num_validators: u32 = 4;
362        let quorum: u32 = 3;
363        let runner = deterministic::Runner::timed(Duration::from_secs(30));
364
365        runner.start(|mut context| async move {
366            let (polynomial, mut shares_vec) =
367                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
368            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369
370            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
371                context.with_label("simulation"),
372                num_validators,
373                &mut shares_vec,
374                RELIABLE_LINK,
375            )
376            .await;
377            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
378            let mut reporters =
379                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
380
381            spawn_validator_engines::<V>(
382                context.with_label("validator"),
383                polynomial.clone(),
384                &pks,
385                &validators,
386                &mut registrations,
387                &mut automatons.lock().unwrap(),
388                &mut reporters,
389                &mut oracle,
390                Duration::from_secs(5),
391                vec![0],
392            );
393
394            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
395        });
396    }
397
398    #[test_traced("INFO")]
399    fn test_byzantine_proposer() {
400        byzantine_proposer::<MinPk>();
401        byzantine_proposer::<MinSig>();
402    }
403
404    fn unclean_byzantine_shutdown<V: Variant>() {
405        // Test parameters
406        let num_validators: u32 = 4;
407        let quorum: u32 = 3;
408        let target_index = 200; // Target multiple rounds of signing
409        let min_shutdowns = 4; // Minimum number of shutdowns per validator
410        let max_shutdowns = 10; // Maximum number of shutdowns per validator
411        let shutdown_range_min = Duration::from_millis(100);
412        let shutdown_range_max = Duration::from_millis(1_000);
413
414        // Must be shorter than the maximum shutdown range to make progress after restarting
415        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
416
417        let mut prev_ctx = None;
418        let all_validators = Arc::new(Mutex::new(Vec::new()));
419
420        // Generate shares once
421        let mut rng = StdRng::seed_from_u64(0);
422        let (polynomial, mut shares_vec) =
423            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
424        let identity = *poly::public::<V>(&polynomial);
425        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
426
427        // Continue until shared reporter reaches target or max shutdowns exceeded
428        let mut shutdown_count = 0;
429        while shutdown_count < max_shutdowns {
430            let all_validators = all_validators.clone();
431            let mut shares_vec = shares_vec.clone();
432            let polynomial = polynomial.clone();
433            let f = move |mut context: Context| {
434                async move {
435                    let (oracle, validators, pks, mut registrations) = initialize_simulation(
436                        context.with_label("simulation"),
437                        num_validators,
438                        &mut shares_vec,
439                        RELIABLE_LINK,
440                    )
441                    .await;
442                    // Store all validator public keys if not already done
443                    if all_validators.lock().unwrap().is_empty() {
444                        let mut pks_lock = all_validators.lock().unwrap();
445                        *pks_lock = pks.clone();
446                    }
447                    let automatons =
448                        Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
449
450                    // Use unique journal partitions for each validator to enable restart recovery
451                    let mut engine_monitors = HashMap::new();
452                    let namespace = b"my testing namespace";
453
454                    // Create a shared reporter
455                    //
456                    // We rely on replay to populate this reporter with a contiguous history of certificates.
457                    let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
458                        namespace,
459                        num_validators,
460                        polynomial.clone(),
461                    );
462                    context.with_label("reporter").spawn(|_| reporter.run());
463
464                    // Start validator engines
465                    for (i, (validator, _, share)) in validators.iter().enumerate() {
466                        let validator_context = context.with_label(&validator.to_string());
467                        let monitor = mocks::Monitor::new(111);
468                        engine_monitors.insert(validator.clone(), monitor.clone());
469                        let supervisor = {
470                            let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
471                            s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
472                            s
473                        };
474
475                        let blocker = oracle.control(validator.clone());
476                        let automaton = mocks::Application::new(if i == 0 {
477                            Strategy::Incorrect
478                        } else {
479                            Strategy::Correct
480                        });
481                        automatons
482                            .lock()
483                            .unwrap()
484                            .insert(validator.clone(), automaton.clone());
485
486                        let engine = Engine::new(
487                            validator_context.with_label("engine"),
488                            Config {
489                                monitor,
490                                validators: supervisor,
491                                automaton,
492                                reporter: reporter_mailbox.clone(),
493                                blocker,
494                                namespace: namespace.to_vec(),
495                                priority_acks: false,
496                                rebroadcast_timeout,
497                                epoch_bounds: (1, 1),
498                                window: std::num::NonZeroU64::new(10).unwrap(),
499                                activity_timeout: 1_024, // ensure we don't drop any certificates
500                                journal_partition: format!("unclean_shutdown_test/{validator}"),
501                                journal_write_buffer: NZUsize!(4096),
502                                journal_replay_buffer: NZUsize!(4096),
503                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
504                                journal_compression: Some(3),
505                                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
506                            },
507                        );
508
509                        let (sender, receiver) = registrations.remove(validator).unwrap();
510                        engine.start((sender, receiver));
511                    }
512
513                    // Create a single completion watcher for the shared reporter
514                    let completion =
515                        context
516                            .with_label("completion_watcher")
517                            .spawn(move |context| async move {
518                                loop {
519                                    if let Some(tip_index) =
520                                        reporter_mailbox.get_contiguous_tip().await
521                                    {
522                                        if tip_index >= target_index {
523                                            break;
524                                        }
525                                    }
526                                    context.sleep(Duration::from_millis(50)).await;
527                                }
528                            });
529
530                    // Random shutdown timing to simulate unclean shutdown
531                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532                    select! {
533                        _ = context.sleep(shutdown_wait) => {
534                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535                            (false, context) // Unclean shutdown
536                        },
537                        _ = completion => {
538                            debug!("Shared reporter completed normally");
539                            (true, context) // Clean completion
540                        },
541                    }
542                }
543            };
544
545            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
546                debug!(shutdown_count, "Restarting from previous context");
547                deterministic::Runner::from(prev_ctx)
548            } else {
549                debug!("Starting initial run");
550                deterministic::Runner::timed(Duration::from_secs(45))
551            }
552            .start(f);
553            if complete && shutdown_count >= min_shutdowns {
554                debug!("Test completed successfully");
555                break;
556            }
557
558            prev_ctx = Some(context.recover());
559            shutdown_count += 1;
560        }
561    }
562
563    #[test_traced("INFO")]
564    fn test_unclean_byzantine_shutdown() {
565        unclean_byzantine_shutdown::<MinPk>();
566        unclean_byzantine_shutdown::<MinSig>();
567    }
568
569    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
570        let num_validators: u32 = 4;
571        let quorum: u32 = 3;
572        let cfg = deterministic::Config::new()
573            .with_seed(seed)
574            .with_timeout(Some(Duration::from_secs(120)));
575        let runner = deterministic::Runner::new(cfg);
576
577        runner.start(|mut context| async move {
578            let (polynomial, mut shares_vec) =
579                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
580            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
581
582            // Use degraded network links with realistic conditions
583            let degraded_link = Link {
584                latency: Duration::from_millis(200),
585                jitter: Duration::from_millis(150),
586                success_rate: 0.5,
587            };
588
589            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
590                context.with_label("simulation"),
591                num_validators,
592                &mut shares_vec,
593                degraded_link,
594            )
595            .await;
596            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
597            let mut reporters =
598                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
599
600            spawn_validator_engines::<V>(
601                context.with_label("validator"),
602                polynomial.clone(),
603                &pks,
604                &validators,
605                &mut registrations,
606                &mut automatons.lock().unwrap(),
607                &mut reporters,
608                &mut oracle,
609                Duration::from_secs(2),
610                vec![],
611            );
612
613            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
614
615            context.auditor().state()
616        })
617    }
618
619    #[test_traced("INFO")]
620    fn test_slow_and_lossy_links() {
621        slow_and_lossy_links::<MinPk>(0);
622        slow_and_lossy_links::<MinSig>(0);
623    }
624
625    #[test_traced("INFO")]
626    fn test_determinism() {
627        // We use slow and lossy links as the deterministic test
628        // because it is the most complex test.
629        for seed in 1..6 {
630            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
631            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
632            assert_eq!(pk_state_1, pk_state_2);
633
634            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
635            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
636            assert_eq!(sig_state_1, sig_state_2);
637
638            // Sanity check that different types can't be identical.
639            assert_ne!(pk_state_1, sig_state_1);
640        }
641    }
642
643    fn one_offline<V: Variant>() {
644        let num_validators: u32 = 5;
645        let quorum: u32 = 3;
646        let runner = deterministic::Runner::timed(Duration::from_secs(30));
647
648        runner.start(|mut context| async move {
649            let (polynomial, mut shares_vec) =
650                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
651            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
652
653            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
654                context.with_label("simulation"),
655                num_validators,
656                &mut shares_vec,
657                RELIABLE_LINK,
658            )
659            .await;
660            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
661            let mut reporters =
662                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
663
664            // Start only 4 out of 5 validators (one offline)
665            let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
666            let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
667
668            spawn_validator_engines::<V>(
669                context.with_label("validator"),
670                polynomial.clone(),
671                &online_pks,
672                &online_validators,
673                &mut registrations,
674                &mut automatons.lock().unwrap(),
675                &mut reporters,
676                &mut oracle,
677                Duration::from_secs(5),
678                vec![],
679            );
680            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
681        });
682    }
683
684    #[test_traced("INFO")]
685    fn test_one_offline() {
686        one_offline::<MinPk>();
687        one_offline::<MinSig>();
688    }
689
690    /// Test consensus recovery after a network partition.
691    fn network_partition<V: Variant>() {
692        let num_validators: u32 = 4;
693        let quorum: u32 = 3;
694        let runner = deterministic::Runner::timed(Duration::from_secs(60));
695
696        runner.start(|mut context| async move {
697            let (polynomial, mut shares_vec) =
698                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
699            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
700
701            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
702                context.with_label("simulation"),
703                num_validators,
704                &mut shares_vec,
705                RELIABLE_LINK,
706            )
707            .await;
708            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
709            let mut reporters =
710                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
711
712            spawn_validator_engines::<V>(
713                context.with_label("validator"),
714                polynomial.clone(),
715                &pks,
716                &validators,
717                &mut registrations,
718                &mut automatons.lock().unwrap(),
719                &mut reporters,
720                &mut oracle,
721                Duration::from_secs(5),
722                vec![],
723            );
724
725            for v1 in pks.iter() {
726                for v2 in pks.iter() {
727                    if v2 == v1 {
728                        continue;
729                    }
730                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
731                }
732            }
733            context.sleep(Duration::from_secs(20)).await;
734
735            let link = Link {
736                latency: Duration::from_millis(10),
737                jitter: Duration::from_millis(1),
738                success_rate: 1.0,
739            };
740            for v1 in pks.iter() {
741                for v2 in pks.iter() {
742                    if v2 == v1 {
743                        continue;
744                    }
745                    oracle
746                        .add_link(v1.clone(), v2.clone(), link.clone())
747                        .await
748                        .unwrap();
749                }
750            }
751
752            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
753        });
754    }
755
756    #[test_traced("INFO")]
757    fn test_network_partition() {
758        network_partition::<MinPk>();
759        network_partition::<MinSig>();
760    }
761
762    /// Test insufficient validator participation (below quorum).
763    fn insufficient_validators<V: Variant>() {
764        let num_validators: u32 = 5;
765        let quorum: u32 = 3;
766        let runner = deterministic::Runner::timed(Duration::from_secs(15));
767
768        runner.start(|mut context| async move {
769            let (polynomial, mut shares_vec) =
770                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
771            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
772            let identity = *poly::public::<V>(&polynomial);
773
774            let (oracle, validators, pks, mut registrations) = initialize_simulation(
775                context.with_label("simulation"),
776                num_validators,
777                &mut shares_vec,
778                RELIABLE_LINK,
779            )
780            .await;
781            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
782            let mut reporters =
783                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
784
785            // Start only 2 out of 5 validators (below quorum of 3)
786            let namespace = b"my testing namespace";
787            for (validator, _scheme, share) in validators.iter().take(2) {
788                let context = context.with_label(&validator.to_string());
789                let monitor = mocks::Monitor::new(111);
790                let supervisor = {
791                    let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
792                    s.add_epoch(
793                        111,
794                        share.clone(),
795                        polynomial.clone(),
796                        pks.to_vec(),
797                    );
798                    s
799                };
800
801                let blocker = oracle.control(validator.clone());
802
803                let automaton = mocks::Application::new(Strategy::Correct);
804                automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
805
806                let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
807                    namespace,
808                    pks.len() as u32,
809                    polynomial.clone(),
810                );
811                context.with_label("reporter").spawn(|_| reporter.run());
812                reporters.insert(validator.clone(), reporter_mailbox);
813
814                let engine = Engine::new(
815                    context.with_label("engine"),
816                    Config {
817                        monitor,
818                        validators: supervisor,
819                        automaton: automaton.clone(),
820                        reporter: reporters.get(validator).unwrap().clone(),
821                        blocker,
822                        namespace: namespace.to_vec(),
823                        priority_acks: false,
824                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
825                        epoch_bounds: (1, 1),
826                        window: std::num::NonZeroU64::new(10).unwrap(),
827                        activity_timeout: 100,
828                        journal_partition: format!("aggregation/{validator}"),
829                        journal_write_buffer: NZUsize!(4096),
830                        journal_replay_buffer: NZUsize!(4096),
831                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
832                        journal_compression: Some(3),
833                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
834                    },
835                );
836
837                let (sender, receiver) = registrations.remove(validator).unwrap();
838                engine.start((sender, receiver));
839            }
840
841            // With insufficient validators, consensus should not be achievable
842            // Wait long enough for any potential consensus attempts to complete
843            context.sleep(Duration::from_secs(12)).await;
844
845            // Check that no validator achieved consensus through verified threshold signatures
846            let mut any_consensus = false;
847            for (validator_pk, mut reporter_mailbox) in reporters {
848                let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
849                if tip > 0 {
850                    any_consensus = true;
851                    tracing::warn!(
852                        ?validator_pk,
853                        tip,
854                        "Unexpected threshold signature consensus with insufficient validators"
855                    );
856                }
857            }
858
859            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
860            assert!(
861                !any_consensus,
862                "Consensus should not be achieved with insufficient validator participation (below quorum)"
863            );
864        });
865    }
866
867    #[test_traced("INFO")]
868    fn test_insufficient_validators() {
869        insufficient_validators::<MinPk>();
870        insufficient_validators::<MinSig>();
871    }
872}