commonware_consensus/aggregation/
mod.rs

1//! Recover threshold signatures over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce threshold signatures
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect threshold signatures over concurrent, sequencer-driven broadcast,
12//! _check out [crate::ordered_broadcast]._
13//!
14//! # Architecture
15//!
16//! The core of the module is the [Engine]. It manages the agreement process by:
17//! - Requesting externally synchronized [commonware_cryptography::Digest]s
18//! - Signing said digests with BLS [commonware_cryptography::bls12381::primitives::poly::PartialSignature]
19//! - Multicasting partial signatures to other validators
20//! - Recovering [commonware_cryptography::bls12381::primitives::poly::Signature]s from a quorum of partial signatures
21//! - Monitoring recovery progress and notifying the application layer of recoveries
22//!
23//! The engine interacts with four main components:
24//! - [crate::Automaton]: Provides external digests
25//! - [crate::Reporter]: Receives agreement confirmations
26//! - [crate::Monitor]: Tracks epoch transitions
27//! - [crate::ThresholdSupervisor]: Manages validator sets and network identities
28//!
29//! # Design Decisions
30//!
31//! ## Missing Signature Resolution
32//!
33//! The engine does not try to "fill gaps" when threshold signatures are missing. When validators
34//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
35//! signatures may never be emitted by the local engine. Before skipping ahead, we ensure that
36//! at-least-one honest validator has the threshold signature for any skipped index.
37//!
38//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
39//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
40//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
41//! historical results.
42//!
43//! ## Recovering Threshold Signatures
44//!
45//! In aggregation, participants never gossip recovered threshold signatures. Rather, they gossip [types::TipAck]s
46//! with partial signatures over some index and their latest tip. This approach reduces the overhead of running aggregation
47//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
48//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
49//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
50//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
51
52pub mod types;
53
54cfg_if::cfg_if! {
55    if #[cfg(not(target_arch = "wasm32"))] {
56        mod config;
57        pub use config::Config;
58        mod engine;
59        pub use engine::Engine;
60        mod metrics;
61        mod safe_tip;
62
63        #[cfg(test)]
64        pub mod mocks;
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::{mocks, Config, Engine};
71    use crate::{aggregation::mocks::Strategy, types::Epoch};
72    use commonware_cryptography::{
73        bls12381::{
74            dkg::ops,
75            primitives::{
76                group::Share,
77                poly,
78                variant::{MinPk, MinSig, Variant},
79            },
80        },
81        ed25519::{PrivateKey, PublicKey},
82        sha256::Digest as Sha256Digest,
83        PrivateKeyExt as _, Signer as _,
84    };
85    use commonware_macros::{select, test_traced};
86    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87    use commonware_runtime::{
88        buffer::PoolRef,
89        deterministic::{self, Context},
90        Clock, Metrics, Runner, Spawner,
91    };
92    use commonware_utils::{NZUsize, NonZeroDuration};
93    use futures::{channel::oneshot, future::join_all};
94    use rand::{rngs::StdRng, Rng, SeedableRng};
95    use std::{
96        collections::{BTreeMap, HashMap},
97        num::NonZeroUsize,
98        sync::{Arc, Mutex},
99        time::Duration,
100    };
101    use tracing::debug;
102
103    type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108    /// Reliable network link configuration for testing.
109    const RELIABLE_LINK: Link = Link {
110        latency: Duration::from_millis(10),
111        jitter: Duration::from_millis(1),
112        success_rate: 1.0,
113    };
114
115    /// Register all participants with the network oracle.
116    async fn register_participants(
117        oracle: &mut Oracle<PublicKey>,
118        participants: &[PublicKey],
119    ) -> Registrations<PublicKey> {
120        let mut registrations = BTreeMap::new();
121        for participant in participants.iter() {
122            let (sender, receiver) = oracle
123                .control(participant.clone())
124                .register(0)
125                .await
126                .unwrap();
127            registrations.insert(participant.clone(), (sender, receiver));
128        }
129        registrations
130    }
131
132    /// Establish network links between all participants.
133    async fn link_participants(
134        oracle: &mut Oracle<PublicKey>,
135        participants: &[PublicKey],
136        link: Link,
137    ) {
138        for v1 in participants.iter() {
139            for v2 in participants.iter() {
140                if v2 == v1 {
141                    continue;
142                }
143                oracle
144                    .add_link(v1.clone(), v2.clone(), link.clone())
145                    .await
146                    .unwrap();
147            }
148        }
149    }
150
151    /// Initialize a simulated network environment.
152    async fn initialize_simulation(
153        context: Context,
154        num_validators: u32,
155        shares_vec: &mut [Share],
156        link: Link,
157    ) -> (
158        Oracle<PublicKey>,
159        Vec<(PublicKey, PrivateKey, Share)>,
160        Vec<PublicKey>,
161        Registrations<PublicKey>,
162    ) {
163        let (network, mut oracle) = Network::new(
164            context.with_label("network"),
165            commonware_p2p::simulated::Config {
166                max_size: 1024 * 1024,
167                disconnect_on_block: true,
168                tracked_peer_sets: None,
169            },
170        );
171        network.start();
172
173        let mut schemes = (0..num_validators)
174            .map(|i| PrivateKey::from_seed(i as u64))
175            .collect::<Vec<_>>();
176        schemes.sort_by_key(|s| s.public_key());
177        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
178            .iter()
179            .enumerate()
180            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
181            .collect();
182        let pks = validators
183            .iter()
184            .map(|(pk, _, _)| pk.clone())
185            .collect::<Vec<_>>();
186
187        let registrations = register_participants(&mut oracle, &pks).await;
188        link_participants(&mut oracle, &pks, link).await;
189        (oracle, validators, pks, registrations)
190    }
191
192    /// Spawn aggregation engines for all validators.
193    #[allow(clippy::too_many_arguments)]
194    fn spawn_validator_engines<V: Variant>(
195        context: Context,
196        polynomial: poly::Public<V>,
197        validator_pks: &[PublicKey],
198        validators: &[(PublicKey, PrivateKey, Share)],
199        registrations: &mut Registrations<PublicKey>,
200        automatons: &mut BTreeMap<PublicKey, mocks::Application>,
201        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
202        oracle: &mut Oracle<PublicKey>,
203        rebroadcast_timeout: Duration,
204        incorrect: Vec<usize>,
205    ) -> HashMap<PublicKey, mocks::Monitor> {
206        let mut monitors = HashMap::new();
207        let namespace = b"my testing namespace";
208
209        for (i, (validator, _, share)) in validators.iter().enumerate() {
210            let context = context.with_label(&validator.to_string());
211            let monitor = mocks::Monitor::new(111);
212            monitors.insert(validator.clone(), monitor.clone());
213            let supervisor = {
214                let identity = *poly::public::<V>(&polynomial);
215                let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
216                s.add_epoch(
217                    111,
218                    share.clone(),
219                    polynomial.clone(),
220                    validator_pks.to_vec(),
221                );
222                s
223            };
224
225            let blocker = oracle.control(validator.clone());
226
227            let automaton = mocks::Application::new(if incorrect.contains(&i) {
228                Strategy::Incorrect
229            } else {
230                Strategy::Correct
231            });
232            automatons.insert(validator.clone(), automaton.clone());
233
234            let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
235                namespace,
236                validator_pks.len() as u32,
237                polynomial.clone(),
238            );
239            context.with_label("reporter").spawn(|_| reporter.run());
240            reporters.insert(validator.clone(), reporter_mailbox);
241
242            let engine = Engine::new(
243                context.with_label("engine"),
244                Config {
245                    monitor,
246                    validators: supervisor,
247                    automaton: automaton.clone(),
248                    reporter: reporters.get(validator).unwrap().clone(),
249                    blocker,
250                    namespace: namespace.to_vec(),
251                    priority_acks: false,
252                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
253                    epoch_bounds: (1, 1),
254                    window: std::num::NonZeroU64::new(10).unwrap(),
255                    activity_timeout: 100,
256                    journal_partition: format!("aggregation/{validator}"),
257                    journal_write_buffer: NZUsize!(4096),
258                    journal_replay_buffer: NZUsize!(4096),
259                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
260                    journal_compression: Some(3),
261                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
262                },
263            );
264
265            let (sender, receiver) = registrations.remove(validator).unwrap();
266            engine.start((sender, receiver));
267        }
268        monitors
269    }
270
271    /// Wait for all reporters to reach the specified consensus threshold.
272    async fn await_reporters<V: Variant>(
273        context: Context,
274        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
275        threshold_index: u64,
276        threshold_epoch: Epoch,
277    ) {
278        let mut receivers = Vec::new();
279        for (reporter, mailbox) in reporters.iter() {
280            // Create a oneshot channel to signal when the reporter has reached the threshold.
281            let (tx, rx) = oneshot::channel();
282            receivers.push(rx);
283
284            context.with_label("reporter_watcher").spawn({
285                let reporter = reporter.clone();
286                let mut mailbox = mailbox.clone();
287                move |context| async move {
288                    loop {
289                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
290                        debug!(
291                            index,
292                            epoch,
293                            threshold_index,
294                            threshold_epoch,
295                            ?reporter,
296                            "reporter status"
297                        );
298                        if index >= threshold_index && epoch >= threshold_epoch {
299                            debug!(
300                                ?reporter,
301                                "reporter reached threshold, signaling completion"
302                            );
303                            let _ = tx.send(reporter.clone());
304                            break;
305                        }
306                        context.sleep(Duration::from_millis(100)).await;
307                    }
308                }
309            });
310        }
311
312        // Wait for all oneshot receivers to complete.
313        let results = join_all(receivers).await;
314        assert_eq!(results.len(), reporters.len());
315
316        // Check that none were cancelled.
317        for result in results {
318            assert!(result.is_ok(), "reporter was cancelled");
319        }
320    }
321
322    /// Test aggregation consensus with all validators online.
323    fn all_online<V: Variant>() {
324        let num_validators: u32 = 4;
325        let quorum: u32 = 3;
326        let runner = deterministic::Runner::timed(Duration::from_secs(30));
327
328        runner.start(|mut context| async move {
329            let (polynomial, mut shares_vec) =
330                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
331            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
332
333            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
334                context.with_label("simulation"),
335                num_validators,
336                &mut shares_vec,
337                RELIABLE_LINK,
338            )
339            .await;
340            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
341            let mut reporters =
342                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
343            spawn_validator_engines::<V>(
344                context.with_label("validator"),
345                polynomial.clone(),
346                &pks,
347                &validators,
348                &mut registrations,
349                &mut automatons.lock().unwrap(),
350                &mut reporters,
351                &mut oracle,
352                Duration::from_secs(5),
353                vec![],
354            );
355            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
356        });
357    }
358
359    #[test_traced("INFO")]
360    fn test_all_online() {
361        all_online::<MinPk>();
362        all_online::<MinSig>();
363    }
364
365    /// Test consensus resilience to Byzantine behavior.
366    fn byzantine_proposer<V: Variant>() {
367        let num_validators: u32 = 4;
368        let quorum: u32 = 3;
369        let runner = deterministic::Runner::timed(Duration::from_secs(30));
370
371        runner.start(|mut context| async move {
372            let (polynomial, mut shares_vec) =
373                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
374            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375
376            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
377                context.with_label("simulation"),
378                num_validators,
379                &mut shares_vec,
380                RELIABLE_LINK,
381            )
382            .await;
383            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
384            let mut reporters =
385                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
386
387            spawn_validator_engines::<V>(
388                context.with_label("validator"),
389                polynomial.clone(),
390                &pks,
391                &validators,
392                &mut registrations,
393                &mut automatons.lock().unwrap(),
394                &mut reporters,
395                &mut oracle,
396                Duration::from_secs(5),
397                vec![0],
398            );
399
400            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
401        });
402    }
403
404    #[test_traced("INFO")]
405    fn test_byzantine_proposer() {
406        byzantine_proposer::<MinPk>();
407        byzantine_proposer::<MinSig>();
408    }
409
410    fn unclean_byzantine_shutdown<V: Variant>() {
411        // Test parameters
412        let num_validators: u32 = 4;
413        let quorum: u32 = 3;
414        let target_index = 200; // Target multiple rounds of signing
415        let min_shutdowns = 4; // Minimum number of shutdowns per validator
416        let max_shutdowns = 10; // Maximum number of shutdowns per validator
417        let shutdown_range_min = Duration::from_millis(100);
418        let shutdown_range_max = Duration::from_millis(1_000);
419
420        // Must be shorter than the maximum shutdown range to make progress after restarting
421        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
422
423        let mut prev_checkpoint = None;
424        let all_validators = Arc::new(Mutex::new(Vec::new()));
425
426        // Generate shares once
427        let mut rng = StdRng::seed_from_u64(0);
428        let (polynomial, mut shares_vec) =
429            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
430        let identity = *poly::public::<V>(&polynomial);
431        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
432
433        // Continue until shared reporter reaches target or max shutdowns exceeded
434        let mut shutdown_count = 0;
435        while shutdown_count < max_shutdowns {
436            let all_validators = all_validators.clone();
437            let mut shares_vec = shares_vec.clone();
438            let polynomial = polynomial.clone();
439            let f = move |mut context: Context| {
440                async move {
441                    let (oracle, validators, pks, mut registrations) = initialize_simulation(
442                        context.with_label("simulation"),
443                        num_validators,
444                        &mut shares_vec,
445                        RELIABLE_LINK,
446                    )
447                    .await;
448                    // Store all validator public keys if not already done
449                    if all_validators.lock().unwrap().is_empty() {
450                        let mut pks_lock = all_validators.lock().unwrap();
451                        *pks_lock = pks.clone();
452                    }
453                    let automatons =
454                        Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
455
456                    // Use unique journal partitions for each validator to enable restart recovery
457                    let mut engine_monitors = HashMap::new();
458                    let namespace = b"my testing namespace";
459
460                    // Create a shared reporter
461                    //
462                    // We rely on replay to populate this reporter with a contiguous history of certificates.
463                    let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
464                        namespace,
465                        num_validators,
466                        polynomial.clone(),
467                    );
468                    context.with_label("reporter").spawn(|_| reporter.run());
469
470                    // Start validator engines
471                    for (i, (validator, _, share)) in validators.iter().enumerate() {
472                        let validator_context = context.with_label(&validator.to_string());
473                        let monitor = mocks::Monitor::new(111);
474                        engine_monitors.insert(validator.clone(), monitor.clone());
475                        let supervisor = {
476                            let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
477                            s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
478                            s
479                        };
480
481                        let blocker = oracle.control(validator.clone());
482                        let automaton = mocks::Application::new(if i == 0 {
483                            Strategy::Incorrect
484                        } else {
485                            Strategy::Correct
486                        });
487                        automatons
488                            .lock()
489                            .unwrap()
490                            .insert(validator.clone(), automaton.clone());
491
492                        let engine = Engine::new(
493                            validator_context.with_label("engine"),
494                            Config {
495                                monitor,
496                                validators: supervisor,
497                                automaton,
498                                reporter: reporter_mailbox.clone(),
499                                blocker,
500                                namespace: namespace.to_vec(),
501                                priority_acks: false,
502                                rebroadcast_timeout,
503                                epoch_bounds: (1, 1),
504                                window: std::num::NonZeroU64::new(10).unwrap(),
505                                activity_timeout: 1_024, // ensure we don't drop any certificates
506                                journal_partition: format!("unclean_shutdown_test/{validator}"),
507                                journal_write_buffer: NZUsize!(4096),
508                                journal_replay_buffer: NZUsize!(4096),
509                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
510                                journal_compression: Some(3),
511                                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
512                            },
513                        );
514
515                        let (sender, receiver) = registrations.remove(validator).unwrap();
516                        engine.start((sender, receiver));
517                    }
518
519                    // Create a single completion watcher for the shared reporter
520                    let completion =
521                        context
522                            .with_label("completion_watcher")
523                            .spawn(move |context| async move {
524                                loop {
525                                    if let Some(tip_index) =
526                                        reporter_mailbox.get_contiguous_tip().await
527                                    {
528                                        if tip_index >= target_index {
529                                            break;
530                                        }
531                                    }
532                                    context.sleep(Duration::from_millis(50)).await;
533                                }
534                            });
535
536                    // Random shutdown timing to simulate unclean shutdown
537                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
538                    select! {
539                        _ = context.sleep(shutdown_wait) => {
540                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
541                            false // Unclean shutdown
542                        },
543                        _ = completion => {
544                            debug!("Shared reporter completed normally");
545                            true // Clean completion
546                        },
547                    }
548                }
549            };
550
551            let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
552                debug!(shutdown_count, "Restarting from previous context");
553                deterministic::Runner::from(prev_checkpoint)
554            } else {
555                debug!("Starting initial run");
556                deterministic::Runner::timed(Duration::from_secs(45))
557            }
558            .start_and_recover(f);
559            if complete && shutdown_count >= min_shutdowns {
560                debug!("Test completed successfully");
561                break;
562            }
563
564            prev_checkpoint = Some(checkpoint);
565            shutdown_count += 1;
566        }
567    }
568
569    #[test_traced("INFO")]
570    fn test_unclean_byzantine_shutdown() {
571        unclean_byzantine_shutdown::<MinPk>();
572        unclean_byzantine_shutdown::<MinSig>();
573    }
574
575    fn unclean_shutdown_with_unsigned_index<V: Variant>() {
576        // Test parameters
577        let num_validators: u32 = 4;
578        let quorum: u32 = 3;
579        let skip_index = 50u64; // Index where no one will sign
580        let window = 10u64;
581        let target_index = 100u64;
582        let namespace = b"my testing namespace";
583
584        // Generate shares once
585        let all_validators = Arc::new(Mutex::new(Vec::new()));
586        let mut rng = StdRng::seed_from_u64(0);
587        let (polynomial, mut shares_vec) =
588            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
589        let identity = *poly::public::<V>(&polynomial);
590        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
591
592        // First run: let validators skip signing at skip_index and reach beyond it
593        let f = |context: Context| {
594            let all_validators_clone = all_validators.clone();
595            let mut shares_vec_clone = shares_vec.clone();
596            let polynomial_clone = polynomial.clone();
597            async move {
598                let (oracle, validators, pks, mut registrations) = initialize_simulation(
599                    context.with_label("simulation"),
600                    num_validators,
601                    &mut shares_vec_clone,
602                    RELIABLE_LINK,
603                )
604                .await;
605
606                // Store all validator public keys
607                {
608                    let mut pks_lock = all_validators_clone.lock().unwrap();
609                    *pks_lock = pks.clone();
610                }
611
612                // Create a shared reporter
613                let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
614                    namespace,
615                    num_validators,
616                    polynomial_clone.clone(),
617                );
618                context.with_label("reporter").spawn(|_| reporter.run());
619
620                // Start validator engines with NoSignature strategy for skip_index
621                let mut engine_monitors = HashMap::new();
622                let automatons =
623                    Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
624                for (validator, _, share) in validators.iter() {
625                    let validator_context = context.with_label(&validator.to_string());
626                    let monitor = mocks::Monitor::new(111);
627                    engine_monitors.insert(validator.clone(), monitor.clone());
628                    let supervisor = {
629                        let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
630                        s.add_epoch(111, share.clone(), polynomial_clone.clone(), pks.to_vec());
631                        s
632                    };
633                    let blocker = oracle.control(validator.clone());
634
635                    // All validators use NoSignature strategy for skip_index
636                    let automaton = mocks::Application::new(Strategy::Skip { index: skip_index });
637                    automatons
638                        .lock()
639                        .unwrap()
640                        .insert(validator.clone(), automaton.clone());
641
642                    let engine = Engine::new(
643                        validator_context.with_label("engine"),
644                        Config {
645                            monitor,
646                            validators: supervisor,
647                            automaton,
648                            reporter: reporter_mailbox.clone(),
649                            blocker,
650                            namespace: namespace.to_vec(),
651                            priority_acks: false,
652                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
653                                100,
654                            )),
655                            epoch_bounds: (1, 1),
656                            window: std::num::NonZeroU64::new(window).unwrap(),
657                            activity_timeout: 100,
658                            journal_partition: format!("unsigned_index_test/{validator}"),
659                            journal_write_buffer: NZUsize!(4096),
660                            journal_replay_buffer: NZUsize!(4096),
661                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
662                            journal_compression: Some(3),
663                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
664                        },
665                    );
666
667                    let (sender, receiver) = registrations.remove(validator).unwrap();
668                    engine.start((sender, receiver));
669                }
670
671                // Wait for validators to reach target_index (past skip_index)
672                loop {
673                    if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
674                        debug!(tip_index, skip_index, target_index, "reporter status");
675                        if tip_index >= skip_index + window - 1 {
676                            // max we can proceed before item confirmed
677                            return;
678                        }
679                    }
680                    context.sleep(Duration::from_millis(50)).await;
681                }
682            }
683        };
684        let (_, checkpoint) =
685            deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
686
687        // Second run: restart and verify the skip_index gets confirmed
688        let f2 = move |context: Context| {
689            async move {
690                let (oracle, validators, pks, mut registrations) = initialize_simulation(
691                    context.with_label("simulation"),
692                    num_validators,
693                    &mut shares_vec,
694                    RELIABLE_LINK,
695                )
696                .await;
697
698                // Create a shared reporter
699                let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
700                    namespace,
701                    num_validators,
702                    polynomial.clone(),
703                );
704                context.with_label("reporter").spawn(|_| reporter.run());
705
706                // Start validator engines with Correct strategy (will sign everything now)
707                let automatons =
708                    Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
709                for (validator, _, share) in validators.iter() {
710                    let validator_context = context.with_label(&validator.to_string());
711                    let monitor = mocks::Monitor::new(111);
712                    let supervisor = {
713                        let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
714                        s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
715                        s
716                    };
717
718                    let blocker = oracle.control(validator.clone());
719
720                    // Now all validators use Correct strategy
721                    let automaton = mocks::Application::new(Strategy::Correct);
722                    automatons
723                        .lock()
724                        .unwrap()
725                        .insert(validator.clone(), automaton.clone());
726
727                    let engine = Engine::new(
728                        validator_context.with_label("engine"),
729                        Config {
730                            monitor,
731                            validators: supervisor,
732                            automaton,
733                            reporter: reporter_mailbox.clone(),
734                            blocker,
735                            namespace: namespace.to_vec(),
736                            priority_acks: false,
737                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
738                                100,
739                            )),
740                            epoch_bounds: (1, 1),
741                            window: std::num::NonZeroU64::new(10).unwrap(),
742                            activity_timeout: 100,
743                            journal_partition: format!("unsigned_index_test/{validator}"),
744                            journal_write_buffer: NZUsize!(4096),
745                            journal_replay_buffer: NZUsize!(4096),
746                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
747                            journal_compression: Some(3),
748                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
749                        },
750                    );
751
752                    let (sender, receiver) = registrations.remove(validator).unwrap();
753                    engine.start((sender, receiver));
754                }
755
756                // Wait for skip_index to be confirmed (should happen on replay)
757                loop {
758                    if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
759                        debug!(
760                            tip_index,
761                            skip_index, target_index, "reporter status on restart",
762                        );
763                        if tip_index >= target_index {
764                            break;
765                        }
766                    }
767                    context.sleep(Duration::from_millis(50)).await;
768                }
769            }
770        };
771        deterministic::Runner::from(checkpoint).start(f2);
772    }
773
774    #[test_traced("INFO")]
775    fn test_unclean_shutdown_with_unsigned_index() {
776        unclean_shutdown_with_unsigned_index::<MinPk>();
777        unclean_shutdown_with_unsigned_index::<MinSig>();
778    }
779
780    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
781        let num_validators: u32 = 4;
782        let quorum: u32 = 3;
783        let cfg = deterministic::Config::new()
784            .with_seed(seed)
785            .with_timeout(Some(Duration::from_secs(120)));
786        let runner = deterministic::Runner::new(cfg);
787
788        runner.start(|mut context| async move {
789            let (polynomial, mut shares_vec) =
790                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
791            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
792
793            // Use degraded network links with realistic conditions
794            let degraded_link = Link {
795                latency: Duration::from_millis(200),
796                jitter: Duration::from_millis(150),
797                success_rate: 0.5,
798            };
799
800            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
801                context.with_label("simulation"),
802                num_validators,
803                &mut shares_vec,
804                degraded_link,
805            )
806            .await;
807            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
808            let mut reporters =
809                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
810
811            spawn_validator_engines::<V>(
812                context.with_label("validator"),
813                polynomial.clone(),
814                &pks,
815                &validators,
816                &mut registrations,
817                &mut automatons.lock().unwrap(),
818                &mut reporters,
819                &mut oracle,
820                Duration::from_secs(2),
821                vec![],
822            );
823
824            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
825
826            context.auditor().state()
827        })
828    }
829
830    #[test_traced("INFO")]
831    fn test_slow_and_lossy_links() {
832        slow_and_lossy_links::<MinPk>(0);
833        slow_and_lossy_links::<MinSig>(0);
834    }
835
836    #[test_traced("INFO")]
837    fn test_determinism() {
838        // We use slow and lossy links as the deterministic test
839        // because it is the most complex test.
840        for seed in 1..6 {
841            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
842            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
843            assert_eq!(pk_state_1, pk_state_2);
844
845            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
846            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
847            assert_eq!(sig_state_1, sig_state_2);
848
849            // Sanity check that different types can't be identical.
850            assert_ne!(pk_state_1, sig_state_1);
851        }
852    }
853
854    fn one_offline<V: Variant>() {
855        let num_validators: u32 = 5;
856        let quorum: u32 = 3;
857        let runner = deterministic::Runner::timed(Duration::from_secs(30));
858
859        runner.start(|mut context| async move {
860            let (polynomial, mut shares_vec) =
861                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
862            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
863
864            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
865                context.with_label("simulation"),
866                num_validators,
867                &mut shares_vec,
868                RELIABLE_LINK,
869            )
870            .await;
871            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
872            let mut reporters =
873                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
874
875            // Start only 4 out of 5 validators (one offline)
876            let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
877            let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
878
879            spawn_validator_engines::<V>(
880                context.with_label("validator"),
881                polynomial.clone(),
882                &online_pks,
883                &online_validators,
884                &mut registrations,
885                &mut automatons.lock().unwrap(),
886                &mut reporters,
887                &mut oracle,
888                Duration::from_secs(5),
889                vec![],
890            );
891            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
892        });
893    }
894
895    #[test_traced("INFO")]
896    fn test_one_offline() {
897        one_offline::<MinPk>();
898        one_offline::<MinSig>();
899    }
900
901    /// Test consensus recovery after a network partition.
902    fn network_partition<V: Variant>() {
903        let num_validators: u32 = 4;
904        let quorum: u32 = 3;
905        let runner = deterministic::Runner::timed(Duration::from_secs(60));
906
907        runner.start(|mut context| async move {
908            let (polynomial, mut shares_vec) =
909                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
910            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
911
912            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
913                context.with_label("simulation"),
914                num_validators,
915                &mut shares_vec,
916                RELIABLE_LINK,
917            )
918            .await;
919            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
920            let mut reporters =
921                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
922
923            spawn_validator_engines::<V>(
924                context.with_label("validator"),
925                polynomial.clone(),
926                &pks,
927                &validators,
928                &mut registrations,
929                &mut automatons.lock().unwrap(),
930                &mut reporters,
931                &mut oracle,
932                Duration::from_secs(5),
933                vec![],
934            );
935
936            for v1 in pks.iter() {
937                for v2 in pks.iter() {
938                    if v2 == v1 {
939                        continue;
940                    }
941                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
942                }
943            }
944            context.sleep(Duration::from_secs(20)).await;
945
946            let link = Link {
947                latency: Duration::from_millis(10),
948                jitter: Duration::from_millis(1),
949                success_rate: 1.0,
950            };
951            for v1 in pks.iter() {
952                for v2 in pks.iter() {
953                    if v2 == v1 {
954                        continue;
955                    }
956                    oracle
957                        .add_link(v1.clone(), v2.clone(), link.clone())
958                        .await
959                        .unwrap();
960                }
961            }
962
963            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
964        });
965    }
966
967    #[test_traced("INFO")]
968    fn test_network_partition() {
969        network_partition::<MinPk>();
970        network_partition::<MinSig>();
971    }
972
973    /// Test insufficient validator participation (below quorum).
974    fn insufficient_validators<V: Variant>() {
975        let num_validators: u32 = 5;
976        let quorum: u32 = 3;
977        let runner = deterministic::Runner::timed(Duration::from_secs(15));
978
979        runner.start(|mut context| async move {
980            let (polynomial, mut shares_vec) =
981                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
982            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
983            let identity = *poly::public::<V>(&polynomial);
984
985            let (oracle, validators, pks, mut registrations) = initialize_simulation(
986                context.with_label("simulation"),
987                num_validators,
988                &mut shares_vec,
989                RELIABLE_LINK,
990            )
991            .await;
992            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
993            let mut reporters =
994                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
995
996            // Start only 2 out of 5 validators (below quorum of 3)
997            let namespace = b"my testing namespace";
998            for (validator, _scheme, share) in validators.iter().take(2) {
999                let context = context.with_label(&validator.to_string());
1000                let monitor = mocks::Monitor::new(111);
1001                let supervisor = {
1002                    let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
1003                    s.add_epoch(
1004                        111,
1005                        share.clone(),
1006                        polynomial.clone(),
1007                        pks.to_vec(),
1008                    );
1009                    s
1010                };
1011
1012                let blocker = oracle.control(validator.clone());
1013
1014                let automaton = mocks::Application::new(Strategy::Correct);
1015                automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
1016
1017                let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
1018                    namespace,
1019                    pks.len() as u32,
1020                    polynomial.clone(),
1021                );
1022                context.with_label("reporter").spawn(|_| reporter.run());
1023                reporters.insert(validator.clone(), reporter_mailbox);
1024
1025                let engine = Engine::new(
1026                    context.with_label("engine"),
1027                    Config {
1028                        monitor,
1029                        validators: supervisor,
1030                        automaton: automaton.clone(),
1031                        reporter: reporters.get(validator).unwrap().clone(),
1032                        blocker,
1033                        namespace: namespace.to_vec(),
1034                        priority_acks: false,
1035                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1036                        epoch_bounds: (1, 1),
1037                        window: std::num::NonZeroU64::new(10).unwrap(),
1038                        activity_timeout: 100,
1039                        journal_partition: format!("aggregation/{validator}"),
1040                        journal_write_buffer: NZUsize!(4096),
1041                        journal_replay_buffer: NZUsize!(4096),
1042                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1043                        journal_compression: Some(3),
1044                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1045                    },
1046                );
1047
1048                let (sender, receiver) = registrations.remove(validator).unwrap();
1049                engine.start((sender, receiver));
1050            }
1051
1052            // With insufficient validators, consensus should not be achievable
1053            // Wait long enough for any potential consensus attempts to complete
1054            context.sleep(Duration::from_secs(12)).await;
1055
1056            // Check that no validator achieved consensus through verified threshold signatures
1057            let mut any_consensus = false;
1058            for (validator_pk, mut reporter_mailbox) in reporters {
1059                let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
1060                if tip > 0 {
1061                    any_consensus = true;
1062                    tracing::warn!(
1063                        ?validator_pk,
1064                        tip,
1065                        "Unexpected threshold signature consensus with insufficient validators"
1066                    );
1067                }
1068            }
1069
1070            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1071            assert!(
1072                !any_consensus,
1073                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1074            );
1075        });
1076    }
1077
1078    #[test_traced("INFO")]
1079    fn test_insufficient_validators() {
1080        insufficient_validators::<MinPk>();
1081        insufficient_validators::<MinSig>();
1082    }
1083}