commonware_consensus/aggregation/
mod.rs

1//! Recover threshold signatures over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce threshold signatures
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect threshold signatures over concurrent, sequencer-driven broadcast,
12//! _check out [crate::ordered_broadcast]._
13//!
14//! # Architecture
15//!
16//! The core of the module is the [Engine]. It manages the agreement process by:
17//! - Requesting externally synchronized [commonware_cryptography::Digest]s
18//! - Signing said digests with BLS [commonware_cryptography::bls12381::primitives::poly::PartialSignature]
19//! - Multicasting partial signatures to other validators
20//! - Recovering [commonware_cryptography::bls12381::primitives::poly::Signature]s from a quorum of partial signatures
21//! - Monitoring recovery progress and notifying the application layer of recoveries
22//!
23//! The engine interacts with four main components:
24//! - [crate::Automaton]: Provides external digests
25//! - [crate::Reporter]: Receives agreement confirmations
26//! - [crate::Monitor]: Tracks epoch transitions
27//! - [crate::ThresholdSupervisor]: Manages validator sets and network identities
28//!
29//! # Design Decisions
30//!
31//! ## Missing Signature Resolution
32//!
33//! The engine does not try to "fill gaps" when threshold signatures are missing. When validators
34//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
35//! signatures may never be emitted by the local engine. Before skipping ahead, we ensure that
36//! at-least-one honest validator has the threshold signature for any skipped index.
37//!
38//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
39//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
40//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
41//! historical results.
42//!
43//! ## Recovering Threshold Signatures
44//!
45//! In aggregation, participants never gossip recovered threshold signatures. Rather, they gossip [types::TipAck]s
46//! with partial signatures over some index and their latest tip. This approach reduces the overhead of running aggregation
47//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
48//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
49//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
50//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
51
52pub mod types;
53
54cfg_if::cfg_if! {
55    if #[cfg(not(target_arch = "wasm32"))] {
56        mod config;
57        pub use config::Config;
58        mod engine;
59        pub use engine::Engine;
60        mod metrics;
61        mod safe_tip;
62
63        #[cfg(test)]
64        pub mod mocks;
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::{mocks, types::Epoch, Config, Engine};
71    use crate::aggregation::mocks::Strategy;
72    use commonware_cryptography::{
73        bls12381::{
74            dkg::ops,
75            primitives::{
76                group::Share,
77                poly,
78                variant::{MinPk, MinSig, Variant},
79            },
80        },
81        ed25519::{PrivateKey, PublicKey},
82        sha256::Digest as Sha256Digest,
83        PrivateKeyExt as _, Signer as _,
84    };
85    use commonware_macros::{select, test_traced};
86    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87    use commonware_runtime::{
88        buffer::PoolRef,
89        deterministic::{self, Context},
90        Clock, Metrics, Runner, Spawner,
91    };
92    use commonware_utils::{NZUsize, NonZeroDuration};
93    use futures::{channel::oneshot, future::join_all};
94    use rand::{rngs::StdRng, Rng, SeedableRng};
95    use std::{
96        collections::{BTreeMap, HashMap},
97        num::NonZeroUsize,
98        sync::{Arc, Mutex},
99        time::Duration,
100    };
101    use tracing::debug;
102
103    type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108    /// Reliable network link configuration for testing.
109    const RELIABLE_LINK: Link = Link {
110        latency: Duration::from_millis(10),
111        jitter: Duration::from_millis(1),
112        success_rate: 1.0,
113    };
114
115    /// Register all participants with the network oracle.
116    async fn register_participants(
117        oracle: &mut Oracle<PublicKey>,
118        participants: &[PublicKey],
119    ) -> Registrations<PublicKey> {
120        let mut registrations = BTreeMap::new();
121        for participant in participants.iter() {
122            let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
123            registrations.insert(participant.clone(), (sender, receiver));
124        }
125        registrations
126    }
127
128    /// Establish network links between all participants.
129    async fn link_participants(
130        oracle: &mut Oracle<PublicKey>,
131        participants: &[PublicKey],
132        link: Link,
133    ) {
134        for v1 in participants.iter() {
135            for v2 in participants.iter() {
136                if v2 == v1 {
137                    continue;
138                }
139                oracle
140                    .add_link(v1.clone(), v2.clone(), link.clone())
141                    .await
142                    .unwrap();
143            }
144        }
145    }
146
147    /// Initialize a simulated network environment.
148    async fn initialize_simulation(
149        context: Context,
150        num_validators: u32,
151        shares_vec: &mut [Share],
152        link: Link,
153    ) -> (
154        Oracle<PublicKey>,
155        Vec<(PublicKey, PrivateKey, Share)>,
156        Vec<PublicKey>,
157        Registrations<PublicKey>,
158    ) {
159        let (network, mut oracle) = Network::new(
160            context.with_label("network"),
161            commonware_p2p::simulated::Config {
162                max_size: 1024 * 1024,
163            },
164        );
165        network.start();
166
167        let mut schemes = (0..num_validators)
168            .map(|i| PrivateKey::from_seed(i as u64))
169            .collect::<Vec<_>>();
170        schemes.sort_by_key(|s| s.public_key());
171        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
172            .iter()
173            .enumerate()
174            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
175            .collect();
176        let pks = validators
177            .iter()
178            .map(|(pk, _, _)| pk.clone())
179            .collect::<Vec<_>>();
180
181        let registrations = register_participants(&mut oracle, &pks).await;
182        link_participants(&mut oracle, &pks, link).await;
183        (oracle, validators, pks, registrations)
184    }
185
186    /// Spawn aggregation engines for all validators.
187    #[allow(clippy::too_many_arguments)]
188    fn spawn_validator_engines<V: Variant>(
189        context: Context,
190        polynomial: poly::Public<V>,
191        validator_pks: &[PublicKey],
192        validators: &[(PublicKey, PrivateKey, Share)],
193        registrations: &mut Registrations<PublicKey>,
194        automatons: &mut BTreeMap<PublicKey, mocks::Application>,
195        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
196        oracle: &mut Oracle<PublicKey>,
197        rebroadcast_timeout: Duration,
198        incorrect: Vec<usize>,
199    ) -> HashMap<PublicKey, mocks::Monitor> {
200        let mut monitors = HashMap::new();
201        let namespace = b"my testing namespace";
202
203        for (i, (validator, _, share)) in validators.iter().enumerate() {
204            let context = context.with_label(&validator.to_string());
205            let monitor = mocks::Monitor::new(111);
206            monitors.insert(validator.clone(), monitor.clone());
207            let supervisor = {
208                let identity = *poly::public::<V>(&polynomial);
209                let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
210                s.add_epoch(
211                    111,
212                    share.clone(),
213                    polynomial.clone(),
214                    validator_pks.to_vec(),
215                );
216                s
217            };
218
219            let blocker = oracle.control(validator.clone());
220
221            let automaton = mocks::Application::new(if incorrect.contains(&i) {
222                Strategy::Incorrect
223            } else {
224                Strategy::Correct
225            });
226            automatons.insert(validator.clone(), automaton.clone());
227
228            let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
229                namespace,
230                validator_pks.len() as u32,
231                polynomial.clone(),
232            );
233            context.with_label("reporter").spawn(|_| reporter.run());
234            reporters.insert(validator.clone(), reporter_mailbox);
235
236            let engine = Engine::new(
237                context.with_label("engine"),
238                Config {
239                    monitor,
240                    validators: supervisor,
241                    automaton: automaton.clone(),
242                    reporter: reporters.get(validator).unwrap().clone(),
243                    blocker,
244                    namespace: namespace.to_vec(),
245                    priority_acks: false,
246                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247                    epoch_bounds: (1, 1),
248                    window: std::num::NonZeroU64::new(10).unwrap(),
249                    activity_timeout: 100,
250                    journal_partition: format!("aggregation/{validator}"),
251                    journal_write_buffer: NZUsize!(4096),
252                    journal_replay_buffer: NZUsize!(4096),
253                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254                    journal_compression: Some(3),
255                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
256                },
257            );
258
259            let (sender, receiver) = registrations.remove(validator).unwrap();
260            engine.start((sender, receiver));
261        }
262        monitors
263    }
264
265    /// Wait for all reporters to reach the specified consensus threshold.
266    async fn await_reporters<V: Variant>(
267        context: Context,
268        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
269        threshold_index: u64,
270        threshold_epoch: Epoch,
271    ) {
272        let mut receivers = Vec::new();
273        for (reporter, mailbox) in reporters.iter() {
274            // Create a oneshot channel to signal when the reporter has reached the threshold.
275            let (tx, rx) = oneshot::channel();
276            receivers.push(rx);
277
278            context.with_label("reporter_watcher").spawn({
279                let reporter = reporter.clone();
280                let mut mailbox = mailbox.clone();
281                move |context| async move {
282                    loop {
283                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
284                        debug!(
285                            index,
286                            epoch,
287                            threshold_index,
288                            threshold_epoch,
289                            ?reporter,
290                            "reporter status"
291                        );
292                        if index >= threshold_index && epoch >= threshold_epoch {
293                            debug!(
294                                ?reporter,
295                                "reporter reached threshold, signaling completion"
296                            );
297                            let _ = tx.send(reporter.clone());
298                            break;
299                        }
300                        context.sleep(Duration::from_millis(100)).await;
301                    }
302                }
303            });
304        }
305
306        // Wait for all oneshot receivers to complete.
307        let results = join_all(receivers).await;
308        assert_eq!(results.len(), reporters.len());
309
310        // Check that none were cancelled.
311        for result in results {
312            assert!(result.is_ok(), "reporter was cancelled");
313        }
314    }
315
316    /// Test aggregation consensus with all validators online.
317    fn all_online<V: Variant>() {
318        let num_validators: u32 = 4;
319        let quorum: u32 = 3;
320        let runner = deterministic::Runner::timed(Duration::from_secs(30));
321
322        runner.start(|mut context| async move {
323            let (polynomial, mut shares_vec) =
324                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
325            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
326
327            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
328                context.with_label("simulation"),
329                num_validators,
330                &mut shares_vec,
331                RELIABLE_LINK,
332            )
333            .await;
334            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
335            let mut reporters =
336                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
337            spawn_validator_engines::<V>(
338                context.with_label("validator"),
339                polynomial.clone(),
340                &pks,
341                &validators,
342                &mut registrations,
343                &mut automatons.lock().unwrap(),
344                &mut reporters,
345                &mut oracle,
346                Duration::from_secs(5),
347                vec![],
348            );
349            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
350        });
351    }
352
353    #[test_traced("INFO")]
354    fn test_all_online() {
355        all_online::<MinPk>();
356        all_online::<MinSig>();
357    }
358
359    /// Test consensus resilience to Byzantine behavior.
360    fn byzantine_proposer<V: Variant>() {
361        let num_validators: u32 = 4;
362        let quorum: u32 = 3;
363        let runner = deterministic::Runner::timed(Duration::from_secs(30));
364
365        runner.start(|mut context| async move {
366            let (polynomial, mut shares_vec) =
367                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
368            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369
370            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
371                context.with_label("simulation"),
372                num_validators,
373                &mut shares_vec,
374                RELIABLE_LINK,
375            )
376            .await;
377            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
378            let mut reporters =
379                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
380
381            spawn_validator_engines::<V>(
382                context.with_label("validator"),
383                polynomial.clone(),
384                &pks,
385                &validators,
386                &mut registrations,
387                &mut automatons.lock().unwrap(),
388                &mut reporters,
389                &mut oracle,
390                Duration::from_secs(5),
391                vec![0],
392            );
393
394            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
395        });
396    }
397
398    #[test_traced("INFO")]
399    fn test_byzantine_proposer() {
400        byzantine_proposer::<MinPk>();
401        byzantine_proposer::<MinSig>();
402    }
403
404    fn unclean_byzantine_shutdown<V: Variant>() {
405        // Test parameters
406        let num_validators: u32 = 4;
407        let quorum: u32 = 3;
408        let target_index = 200; // Target multiple rounds of signing
409        let min_shutdowns = 4; // Minimum number of shutdowns per validator
410        let max_shutdowns = 10; // Maximum number of shutdowns per validator
411        let shutdown_range_min = Duration::from_millis(100);
412        let shutdown_range_max = Duration::from_millis(1_000);
413
414        // Must be shorter than the maximum shutdown range to make progress after restarting
415        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
416
417        let mut prev_ctx = None;
418        let all_validators = Arc::new(Mutex::new(Vec::new()));
419
420        // Generate shares once
421        let mut rng = StdRng::seed_from_u64(0);
422        let (polynomial, mut shares_vec) =
423            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
424        let identity = *poly::public::<V>(&polynomial);
425        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
426
427        // Continue until shared reporter reaches target or max shutdowns exceeded
428        let mut shutdown_count = 0;
429        while shutdown_count < max_shutdowns {
430            let all_validators = all_validators.clone();
431            let mut shares_vec = shares_vec.clone();
432            let polynomial = polynomial.clone();
433            let f = move |mut context: Context| {
434                async move {
435                    let (oracle, validators, pks, mut registrations) = initialize_simulation(
436                        context.with_label("simulation"),
437                        num_validators,
438                        &mut shares_vec,
439                        RELIABLE_LINK,
440                    )
441                    .await;
442                    // Store all validator public keys if not already done
443                    if all_validators.lock().unwrap().is_empty() {
444                        let mut pks_lock = all_validators.lock().unwrap();
445                        *pks_lock = pks.clone();
446                    }
447                    let automatons =
448                        Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
449
450                    // Use unique journal partitions for each validator to enable restart recovery
451                    let mut engine_monitors = HashMap::new();
452                    let namespace = b"my testing namespace";
453
454                    // Create a shared reporter
455                    //
456                    // We rely on replay to populate this reporter with a contiguous history of certificates.
457                    let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
458                        namespace,
459                        num_validators,
460                        polynomial.clone(),
461                    );
462                    context.with_label("reporter").spawn(|_| reporter.run());
463
464                    // Start validator engines
465                    for (i, (validator, _, share)) in validators.iter().enumerate() {
466                        let validator_context = context.with_label(&validator.to_string());
467                        let monitor = mocks::Monitor::new(111);
468                        engine_monitors.insert(validator.clone(), monitor.clone());
469                        let supervisor = {
470                            let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
471                            s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
472                            s
473                        };
474
475                        let blocker = oracle.control(validator.clone());
476                        let automaton = mocks::Application::new(if i == 0 {
477                            Strategy::Incorrect
478                        } else {
479                            Strategy::Correct
480                        });
481                        automatons
482                            .lock()
483                            .unwrap()
484                            .insert(validator.clone(), automaton.clone());
485
486                        let engine = Engine::new(
487                            validator_context.with_label("engine"),
488                            Config {
489                                monitor,
490                                validators: supervisor,
491                                automaton,
492                                reporter: reporter_mailbox.clone(),
493                                blocker,
494                                namespace: namespace.to_vec(),
495                                priority_acks: false,
496                                rebroadcast_timeout,
497                                epoch_bounds: (1, 1),
498                                window: std::num::NonZeroU64::new(10).unwrap(),
499                                activity_timeout: 1_024, // ensure we don't drop any certificates
500                                journal_partition: format!("unclean_shutdown_test/{validator}"),
501                                journal_write_buffer: NZUsize!(4096),
502                                journal_replay_buffer: NZUsize!(4096),
503                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
504                                journal_compression: Some(3),
505                                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
506                            },
507                        );
508
509                        let (sender, receiver) = registrations.remove(validator).unwrap();
510                        engine.start((sender, receiver));
511                    }
512
513                    // Create a single completion watcher for the shared reporter
514                    let completion =
515                        context
516                            .with_label("completion_watcher")
517                            .spawn(move |context| async move {
518                                loop {
519                                    if let Some(tip_index) =
520                                        reporter_mailbox.get_contiguous_tip().await
521                                    {
522                                        if tip_index >= target_index {
523                                            break;
524                                        }
525                                    }
526                                    context.sleep(Duration::from_millis(50)).await;
527                                }
528                            });
529
530                    // Random shutdown timing to simulate unclean shutdown
531                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532                    select! {
533                        _ = context.sleep(shutdown_wait) => {
534                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535                            (false, context) // Unclean shutdown
536                        },
537                        _ = completion => {
538                            debug!("Shared reporter completed normally");
539                            (true, context) // Clean completion
540                        },
541                    }
542                }
543            };
544
545            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
546                debug!(shutdown_count, "Restarting from previous context");
547                deterministic::Runner::from(prev_ctx)
548            } else {
549                debug!("Starting initial run");
550                deterministic::Runner::timed(Duration::from_secs(45))
551            }
552            .start(f);
553            if complete && shutdown_count >= min_shutdowns {
554                debug!("Test completed successfully");
555                break;
556            }
557
558            prev_ctx = Some(context.recover());
559            shutdown_count += 1;
560        }
561    }
562
563    #[test_traced("INFO")]
564    fn test_unclean_byzantine_shutdown() {
565        unclean_byzantine_shutdown::<MinPk>();
566        unclean_byzantine_shutdown::<MinSig>();
567    }
568
569    fn unclean_shutdown_with_unsigned_index<V: Variant>() {
570        // Test parameters
571        let num_validators: u32 = 4;
572        let quorum: u32 = 3;
573        let skip_index = 50u64; // Index where no one will sign
574        let window = 10u64;
575        let target_index = 100u64;
576        let namespace = b"my testing namespace";
577
578        // Generate shares once
579        let all_validators = Arc::new(Mutex::new(Vec::new()));
580        let mut rng = StdRng::seed_from_u64(0);
581        let (polynomial, mut shares_vec) =
582            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
583        let identity = *poly::public::<V>(&polynomial);
584        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
585
586        // First run: let validators skip signing at skip_index and reach beyond it
587        let f = |context: Context| {
588            let all_validators_clone = all_validators.clone();
589            let mut shares_vec_clone = shares_vec.clone();
590            let polynomial_clone = polynomial.clone();
591            async move {
592                let (oracle, validators, pks, mut registrations) = initialize_simulation(
593                    context.with_label("simulation"),
594                    num_validators,
595                    &mut shares_vec_clone,
596                    RELIABLE_LINK,
597                )
598                .await;
599
600                // Store all validator public keys
601                {
602                    let mut pks_lock = all_validators_clone.lock().unwrap();
603                    *pks_lock = pks.clone();
604                }
605
606                // Create a shared reporter
607                let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
608                    namespace,
609                    num_validators,
610                    polynomial_clone.clone(),
611                );
612                context.with_label("reporter").spawn(|_| reporter.run());
613
614                // Start validator engines with NoSignature strategy for skip_index
615                let mut engine_monitors = HashMap::new();
616                let automatons =
617                    Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
618                for (validator, _, share) in validators.iter() {
619                    let validator_context = context.with_label(&validator.to_string());
620                    let monitor = mocks::Monitor::new(111);
621                    engine_monitors.insert(validator.clone(), monitor.clone());
622                    let supervisor = {
623                        let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
624                        s.add_epoch(111, share.clone(), polynomial_clone.clone(), pks.to_vec());
625                        s
626                    };
627                    let blocker = oracle.control(validator.clone());
628
629                    // All validators use NoSignature strategy for skip_index
630                    let automaton = mocks::Application::new(Strategy::Skip { index: skip_index });
631                    automatons
632                        .lock()
633                        .unwrap()
634                        .insert(validator.clone(), automaton.clone());
635
636                    let engine = Engine::new(
637                        validator_context.with_label("engine"),
638                        Config {
639                            monitor,
640                            validators: supervisor,
641                            automaton,
642                            reporter: reporter_mailbox.clone(),
643                            blocker,
644                            namespace: namespace.to_vec(),
645                            priority_acks: false,
646                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
647                                100,
648                            )),
649                            epoch_bounds: (1, 1),
650                            window: std::num::NonZeroU64::new(window).unwrap(),
651                            activity_timeout: 100,
652                            journal_partition: format!("unsigned_index_test/{validator}"),
653                            journal_write_buffer: NZUsize!(4096),
654                            journal_replay_buffer: NZUsize!(4096),
655                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
656                            journal_compression: Some(3),
657                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
658                        },
659                    );
660
661                    let (sender, receiver) = registrations.remove(validator).unwrap();
662                    engine.start((sender, receiver));
663                }
664
665                // Wait for validators to reach target_index (past skip_index)
666                loop {
667                    if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
668                        debug!(tip_index, skip_index, target_index, "reporter status");
669                        if tip_index >= skip_index + window - 1 {
670                            // max we can proceed before item confirmed
671                            return context;
672                        }
673                    }
674                    context.sleep(Duration::from_millis(50)).await;
675                }
676            }
677        };
678        let context = deterministic::Runner::timed(Duration::from_secs(60)).start(f);
679        let prev_ctx = context.recover();
680
681        // Second run: restart and verify the skip_index gets confirmed
682        let f2 = move |context: Context| {
683            async move {
684                let (oracle, validators, pks, mut registrations) = initialize_simulation(
685                    context.with_label("simulation"),
686                    num_validators,
687                    &mut shares_vec,
688                    RELIABLE_LINK,
689                )
690                .await;
691
692                // Create a shared reporter
693                let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
694                    namespace,
695                    num_validators,
696                    polynomial.clone(),
697                );
698                context.with_label("reporter").spawn(|_| reporter.run());
699
700                // Start validator engines with Correct strategy (will sign everything now)
701                let automatons =
702                    Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
703                for (validator, _, share) in validators.iter() {
704                    let validator_context = context.with_label(&validator.to_string());
705                    let monitor = mocks::Monitor::new(111);
706                    let supervisor = {
707                        let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
708                        s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
709                        s
710                    };
711
712                    let blocker = oracle.control(validator.clone());
713
714                    // Now all validators use Correct strategy
715                    let automaton = mocks::Application::new(Strategy::Correct);
716                    automatons
717                        .lock()
718                        .unwrap()
719                        .insert(validator.clone(), automaton.clone());
720
721                    let engine = Engine::new(
722                        validator_context.with_label("engine"),
723                        Config {
724                            monitor,
725                            validators: supervisor,
726                            automaton,
727                            reporter: reporter_mailbox.clone(),
728                            blocker,
729                            namespace: namespace.to_vec(),
730                            priority_acks: false,
731                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
732                                100,
733                            )),
734                            epoch_bounds: (1, 1),
735                            window: std::num::NonZeroU64::new(10).unwrap(),
736                            activity_timeout: 100,
737                            journal_partition: format!("unsigned_index_test/{validator}"),
738                            journal_write_buffer: NZUsize!(4096),
739                            journal_replay_buffer: NZUsize!(4096),
740                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
741                            journal_compression: Some(3),
742                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
743                        },
744                    );
745
746                    let (sender, receiver) = registrations.remove(validator).unwrap();
747                    engine.start((sender, receiver));
748                }
749
750                // Wait for skip_index to be confirmed (should happen on replay)
751                loop {
752                    if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
753                        debug!(
754                            tip_index,
755                            skip_index, target_index, "reporter status on restart",
756                        );
757                        if tip_index >= target_index {
758                            break;
759                        }
760                    }
761                    context.sleep(Duration::from_millis(50)).await;
762                }
763            }
764        };
765        deterministic::Runner::from(prev_ctx).start(f2);
766    }
767
768    #[test_traced("INFO")]
769    fn test_unclean_shutdown_with_unsigned_index() {
770        unclean_shutdown_with_unsigned_index::<MinPk>();
771        unclean_shutdown_with_unsigned_index::<MinSig>();
772    }
773
774    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
775        let num_validators: u32 = 4;
776        let quorum: u32 = 3;
777        let cfg = deterministic::Config::new()
778            .with_seed(seed)
779            .with_timeout(Some(Duration::from_secs(120)));
780        let runner = deterministic::Runner::new(cfg);
781
782        runner.start(|mut context| async move {
783            let (polynomial, mut shares_vec) =
784                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
785            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
786
787            // Use degraded network links with realistic conditions
788            let degraded_link = Link {
789                latency: Duration::from_millis(200),
790                jitter: Duration::from_millis(150),
791                success_rate: 0.5,
792            };
793
794            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
795                context.with_label("simulation"),
796                num_validators,
797                &mut shares_vec,
798                degraded_link,
799            )
800            .await;
801            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
802            let mut reporters =
803                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
804
805            spawn_validator_engines::<V>(
806                context.with_label("validator"),
807                polynomial.clone(),
808                &pks,
809                &validators,
810                &mut registrations,
811                &mut automatons.lock().unwrap(),
812                &mut reporters,
813                &mut oracle,
814                Duration::from_secs(2),
815                vec![],
816            );
817
818            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
819
820            context.auditor().state()
821        })
822    }
823
824    #[test_traced("INFO")]
825    fn test_slow_and_lossy_links() {
826        slow_and_lossy_links::<MinPk>(0);
827        slow_and_lossy_links::<MinSig>(0);
828    }
829
830    #[test_traced("INFO")]
831    fn test_determinism() {
832        // We use slow and lossy links as the deterministic test
833        // because it is the most complex test.
834        for seed in 1..6 {
835            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
836            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
837            assert_eq!(pk_state_1, pk_state_2);
838
839            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
840            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
841            assert_eq!(sig_state_1, sig_state_2);
842
843            // Sanity check that different types can't be identical.
844            assert_ne!(pk_state_1, sig_state_1);
845        }
846    }
847
848    fn one_offline<V: Variant>() {
849        let num_validators: u32 = 5;
850        let quorum: u32 = 3;
851        let runner = deterministic::Runner::timed(Duration::from_secs(30));
852
853        runner.start(|mut context| async move {
854            let (polynomial, mut shares_vec) =
855                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
856            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
857
858            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
859                context.with_label("simulation"),
860                num_validators,
861                &mut shares_vec,
862                RELIABLE_LINK,
863            )
864            .await;
865            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
866            let mut reporters =
867                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
868
869            // Start only 4 out of 5 validators (one offline)
870            let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
871            let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
872
873            spawn_validator_engines::<V>(
874                context.with_label("validator"),
875                polynomial.clone(),
876                &online_pks,
877                &online_validators,
878                &mut registrations,
879                &mut automatons.lock().unwrap(),
880                &mut reporters,
881                &mut oracle,
882                Duration::from_secs(5),
883                vec![],
884            );
885            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
886        });
887    }
888
889    #[test_traced("INFO")]
890    fn test_one_offline() {
891        one_offline::<MinPk>();
892        one_offline::<MinSig>();
893    }
894
895    /// Test consensus recovery after a network partition.
896    fn network_partition<V: Variant>() {
897        let num_validators: u32 = 4;
898        let quorum: u32 = 3;
899        let runner = deterministic::Runner::timed(Duration::from_secs(60));
900
901        runner.start(|mut context| async move {
902            let (polynomial, mut shares_vec) =
903                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
904            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
905
906            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
907                context.with_label("simulation"),
908                num_validators,
909                &mut shares_vec,
910                RELIABLE_LINK,
911            )
912            .await;
913            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
914            let mut reporters =
915                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
916
917            spawn_validator_engines::<V>(
918                context.with_label("validator"),
919                polynomial.clone(),
920                &pks,
921                &validators,
922                &mut registrations,
923                &mut automatons.lock().unwrap(),
924                &mut reporters,
925                &mut oracle,
926                Duration::from_secs(5),
927                vec![],
928            );
929
930            for v1 in pks.iter() {
931                for v2 in pks.iter() {
932                    if v2 == v1 {
933                        continue;
934                    }
935                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
936                }
937            }
938            context.sleep(Duration::from_secs(20)).await;
939
940            let link = Link {
941                latency: Duration::from_millis(10),
942                jitter: Duration::from_millis(1),
943                success_rate: 1.0,
944            };
945            for v1 in pks.iter() {
946                for v2 in pks.iter() {
947                    if v2 == v1 {
948                        continue;
949                    }
950                    oracle
951                        .add_link(v1.clone(), v2.clone(), link.clone())
952                        .await
953                        .unwrap();
954                }
955            }
956
957            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
958        });
959    }
960
961    #[test_traced("INFO")]
962    fn test_network_partition() {
963        network_partition::<MinPk>();
964        network_partition::<MinSig>();
965    }
966
967    /// Test insufficient validator participation (below quorum).
968    fn insufficient_validators<V: Variant>() {
969        let num_validators: u32 = 5;
970        let quorum: u32 = 3;
971        let runner = deterministic::Runner::timed(Duration::from_secs(15));
972
973        runner.start(|mut context| async move {
974            let (polynomial, mut shares_vec) =
975                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
976            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
977            let identity = *poly::public::<V>(&polynomial);
978
979            let (oracle, validators, pks, mut registrations) = initialize_simulation(
980                context.with_label("simulation"),
981                num_validators,
982                &mut shares_vec,
983                RELIABLE_LINK,
984            )
985            .await;
986            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
987            let mut reporters =
988                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
989
990            // Start only 2 out of 5 validators (below quorum of 3)
991            let namespace = b"my testing namespace";
992            for (validator, _scheme, share) in validators.iter().take(2) {
993                let context = context.with_label(&validator.to_string());
994                let monitor = mocks::Monitor::new(111);
995                let supervisor = {
996                    let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
997                    s.add_epoch(
998                        111,
999                        share.clone(),
1000                        polynomial.clone(),
1001                        pks.to_vec(),
1002                    );
1003                    s
1004                };
1005
1006                let blocker = oracle.control(validator.clone());
1007
1008                let automaton = mocks::Application::new(Strategy::Correct);
1009                automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
1010
1011                let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
1012                    namespace,
1013                    pks.len() as u32,
1014                    polynomial.clone(),
1015                );
1016                context.with_label("reporter").spawn(|_| reporter.run());
1017                reporters.insert(validator.clone(), reporter_mailbox);
1018
1019                let engine = Engine::new(
1020                    context.with_label("engine"),
1021                    Config {
1022                        monitor,
1023                        validators: supervisor,
1024                        automaton: automaton.clone(),
1025                        reporter: reporters.get(validator).unwrap().clone(),
1026                        blocker,
1027                        namespace: namespace.to_vec(),
1028                        priority_acks: false,
1029                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1030                        epoch_bounds: (1, 1),
1031                        window: std::num::NonZeroU64::new(10).unwrap(),
1032                        activity_timeout: 100,
1033                        journal_partition: format!("aggregation/{validator}"),
1034                        journal_write_buffer: NZUsize!(4096),
1035                        journal_replay_buffer: NZUsize!(4096),
1036                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1037                        journal_compression: Some(3),
1038                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1039                    },
1040                );
1041
1042                let (sender, receiver) = registrations.remove(validator).unwrap();
1043                engine.start((sender, receiver));
1044            }
1045
1046            // With insufficient validators, consensus should not be achievable
1047            // Wait long enough for any potential consensus attempts to complete
1048            context.sleep(Duration::from_secs(12)).await;
1049
1050            // Check that no validator achieved consensus through verified threshold signatures
1051            let mut any_consensus = false;
1052            for (validator_pk, mut reporter_mailbox) in reporters {
1053                let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
1054                if tip > 0 {
1055                    any_consensus = true;
1056                    tracing::warn!(
1057                        ?validator_pk,
1058                        tip,
1059                        "Unexpected threshold signature consensus with insufficient validators"
1060                    );
1061                }
1062            }
1063
1064            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1065            assert!(
1066                !any_consensus,
1067                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1068            );
1069        });
1070    }
1071
1072    #[test_traced("INFO")]
1073    fn test_insufficient_validators() {
1074        insufficient_validators::<MinPk>();
1075        insufficient_validators::<MinSig>();
1076    }
1077}