commonware_consensus/aggregation/
mod.rs

1//! Recover threshold signatures over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce threshold signatures
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect threshold signatures over concurrent, sequencer-driven broadcast,
12//! _check out [crate::ordered_broadcast]._
13//!
14//! # Architecture
15//!
16//! The core of the module is the [Engine]. It manages the agreement process by:
17//! - Requesting externally synchronized [commonware_cryptography::Digest]s
18//! - Signing said digests with BLS [commonware_cryptography::bls12381::primitives::poly::PartialSignature]
19//! - Multicasting partial signatures to other validators
20//! - Recovering [commonware_cryptography::bls12381::primitives::poly::Signature]s from a quorum of partial signatures
21//! - Monitoring recovery progress and notifying the application layer of recoveries
22//!
23//! The engine interacts with four main components:
24//! - [crate::Automaton]: Provides external digests
25//! - [crate::Reporter]: Receives agreement confirmations
26//! - [crate::Monitor]: Tracks epoch transitions
27//! - [crate::ThresholdSupervisor]: Manages validator sets and network identities
28//!
29//! # Design Decisions
30//!
31//! ## Missing Signature Resolution
32//!
33//! The engine does not try to "fill gaps" when missing threshold signatures. When validators
34//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
35//! signatures may never be emitted by the local engine. Before skipping ahead, we ensure that
36//! at-least-one honest validator has the threshold signature for any skipped index. This design
37//! is intentional to prioritize the creation of threshold signatures as fast as possible. By
38//! advancing the tip, honest validators can continue producing threshold signatures for new
39//! indices rather than getting stuck trying to backfill missing signatures. Validators who are
40//! online and honest can maintain consensus even when others fall behind or go offline.
41//! Backfilling missing signatures is left to other parts of the application that can implement
42//! appropriate recovery strategies.
43
44pub mod types;
45
46cfg_if::cfg_if! {
47    if #[cfg(not(target_arch = "wasm32"))] {
48        mod config;
49        pub use config::Config;
50        mod engine;
51        pub use engine::Engine;
52        mod metrics;
53        mod safe_tip;
54
55        #[cfg(test)]
56        pub mod mocks;
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::{mocks, types::Epoch, Config, Engine};
63    use commonware_codec::Encode;
64    use commonware_cryptography::{
65        bls12381::{
66            dkg::ops,
67            primitives::{
68                group::Share,
69                ops as bls_ops, poly,
70                variant::{MinPk, MinSig, Variant},
71            },
72        },
73        ed25519::{PrivateKey, PublicKey},
74        sha256::Digest as Sha256Digest,
75        PrivateKeyExt as _, Signer as _,
76    };
77    use commonware_macros::{select, test_traced};
78    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
79    use commonware_runtime::{
80        buffer::PoolRef,
81        deterministic::{self, Context},
82        Clock, Metrics, Runner, Spawner,
83    };
84    use commonware_utils::{NZUsize, NonZeroDuration};
85    use futures::{channel::oneshot, future::join_all};
86    use rand::{rngs::StdRng, Rng, SeedableRng};
87    use std::{
88        collections::{BTreeMap, HashMap},
89        num::NonZeroUsize,
90        sync::{Arc, Mutex},
91        time::Duration,
92    };
93    use tracing::debug;
94
95    type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
96
97    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
98    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
99
100    /// Reliable network link configuration for testing.
101    const RELIABLE_LINK: Link = Link {
102        latency: 10.0,
103        jitter: 1.0,
104        success_rate: 1.0,
105    };
106
107    /// Register all participants with the network oracle.
108    async fn register_participants(
109        oracle: &mut Oracle<PublicKey>,
110        participants: &[PublicKey],
111    ) -> Registrations<PublicKey> {
112        let mut registrations = BTreeMap::new();
113        for participant in participants.iter() {
114            let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
115            registrations.insert(participant.clone(), (sender, receiver));
116        }
117        registrations
118    }
119
120    /// Establish network links between all participants.
121    async fn link_participants(
122        oracle: &mut Oracle<PublicKey>,
123        participants: &[PublicKey],
124        link: Link,
125    ) {
126        for v1 in participants.iter() {
127            for v2 in participants.iter() {
128                if v2 == v1 {
129                    continue;
130                }
131                oracle
132                    .add_link(v1.clone(), v2.clone(), link.clone())
133                    .await
134                    .unwrap();
135            }
136        }
137    }
138
139    /// Initialize a simulated network environment.
140    async fn initialize_simulation(
141        context: Context,
142        num_validators: u32,
143        shares_vec: &mut [Share],
144        link: Link,
145    ) -> (
146        Oracle<PublicKey>,
147        Vec<(PublicKey, PrivateKey, Share)>,
148        Vec<PublicKey>,
149        Registrations<PublicKey>,
150    ) {
151        let (network, mut oracle) = Network::new(
152            context.with_label("network"),
153            commonware_p2p::simulated::Config {
154                max_size: 1024 * 1024,
155            },
156        );
157        network.start();
158
159        let mut schemes = (0..num_validators)
160            .map(|i| PrivateKey::from_seed(i as u64))
161            .collect::<Vec<_>>();
162        schemes.sort_by_key(|s| s.public_key());
163        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
164            .iter()
165            .enumerate()
166            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
167            .collect();
168        let pks = validators
169            .iter()
170            .map(|(pk, _, _)| pk.clone())
171            .collect::<Vec<_>>();
172
173        let registrations = register_participants(&mut oracle, &pks).await;
174        link_participants(&mut oracle, &pks, link).await;
175        (oracle, validators, pks, registrations)
176    }
177
178    /// Spawn aggregation engines for all validators.
179    #[allow(clippy::too_many_arguments)]
180    fn spawn_validator_engines<V: Variant>(
181        context: Context,
182        polynomial: poly::Public<V>,
183        validator_pks: &[PublicKey],
184        validators: &[(PublicKey, PrivateKey, Share)],
185        registrations: &mut Registrations<PublicKey>,
186        automatons: &mut BTreeMap<PublicKey, mocks::Application>,
187        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
188        oracle: &mut Oracle<PublicKey>,
189        rebroadcast_timeout: Duration,
190        invalid_when: fn(u64) -> bool,
191    ) -> HashMap<PublicKey, mocks::Monitor> {
192        let mut monitors = HashMap::new();
193        let namespace = b"my testing namespace";
194        for (validator, _scheme, share) in validators.iter() {
195            let context = context.with_label(&validator.to_string());
196            let monitor = mocks::Monitor::new(111);
197            monitors.insert(validator.clone(), monitor.clone());
198            let supervisor = {
199                let mut s = mocks::Supervisor::<PublicKey, V>::default();
200                s.add_epoch(
201                    111,
202                    share.clone(),
203                    polynomial.clone(),
204                    validator_pks.to_vec(),
205                );
206                s
207            };
208
209            let blocker = oracle.control(validator.clone());
210
211            let automaton = mocks::Application::new(invalid_when);
212            automatons.insert(validator.clone(), automaton.clone());
213
214            let (reporter, reporter_mailbox) =
215                mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
216            context.with_label("reporter").spawn(|_| reporter.run());
217            reporters.insert(validator.clone(), reporter_mailbox);
218
219            let engine = Engine::new(
220                context.with_label("engine"),
221                Config {
222                    monitor,
223                    validators: supervisor,
224                    automaton: automaton.clone(),
225                    reporter: reporters.get(validator).unwrap().clone(),
226                    blocker,
227                    namespace: namespace.to_vec(),
228                    priority_acks: false,
229                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
230                    epoch_bounds: (1, 1),
231                    window: std::num::NonZeroU64::new(10).unwrap(),
232                    journal_partition: format!("aggregation/{validator}"),
233                    journal_write_buffer: NZUsize!(4096),
234                    journal_replay_buffer: NZUsize!(4096),
235                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
236                    journal_compression: Some(3),
237                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
238                },
239            );
240
241            let (sender, receiver) = registrations.remove(validator).unwrap();
242            engine.start((sender, receiver));
243        }
244        monitors
245    }
246
247    /// Wait for all reporters to reach the specified consensus threshold.
248    async fn await_reporters<V: Variant>(
249        context: Context,
250        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
251        threshold_index: u64,
252        threshold_epoch: Epoch,
253    ) {
254        let mut receivers = Vec::new();
255        for (reporter, mailbox) in reporters.iter() {
256            // Create a oneshot channel to signal when the reporter has reached the threshold.
257            let (tx, rx) = oneshot::channel();
258            receivers.push(rx);
259
260            context.with_label("reporter_watcher").spawn({
261                let reporter = reporter.clone();
262                let mut mailbox = mailbox.clone();
263                move |context| async move {
264                    loop {
265                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
266                        let contiguous_index = mailbox.get_contiguous_tip().await.unwrap_or(0);
267                        debug!(
268                            index,
269                            epoch,
270                            contiguous_index,
271                            threshold_index,
272                            threshold_epoch,
273                            ?reporter,
274                            "reporter status"
275                        );
276                        if contiguous_index >= threshold_index && epoch >= threshold_epoch {
277                            debug!(
278                                ?reporter,
279                                "reporter reached threshold, signaling completion"
280                            );
281                            let _ = tx.send(reporter.clone());
282                            break;
283                        }
284                        context.sleep(Duration::from_millis(100)).await;
285                    }
286                }
287            });
288        }
289
290        // Wait for all oneshot receivers to complete.
291        let results = join_all(receivers).await;
292        assert_eq!(results.len(), reporters.len());
293
294        // Check that none were cancelled.
295        for result in results {
296            assert!(result.is_ok(), "reporter was cancelled");
297        }
298    }
299
300    /// Test aggregation consensus with all validators online.
301    fn all_online<V: Variant>() {
302        let num_validators: u32 = 4;
303        let quorum: u32 = 3;
304        let runner = deterministic::Runner::timed(Duration::from_secs(30));
305
306        runner.start(|mut context| async move {
307            let (polynomial, mut shares_vec) =
308                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
309            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
310
311            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
312                context.with_label("simulation"),
313                num_validators,
314                &mut shares_vec,
315                RELIABLE_LINK,
316            )
317            .await;
318            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
319            let mut reporters =
320                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
321            spawn_validator_engines::<V>(
322                context.with_label("validator"),
323                polynomial.clone(),
324                &pks,
325                &validators,
326                &mut registrations,
327                &mut automatons.lock().unwrap(),
328                &mut reporters,
329                &mut oracle,
330                Duration::from_secs(5),
331                |_| false,
332            );
333            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
334        });
335    }
336
337    #[test_traced]
338    fn test_all_online() {
339        all_online::<MinPk>();
340        all_online::<MinSig>();
341    }
342
343    fn unclean_shutdown<V: Variant>() {
344        // Test parameters
345        let num_validators: u32 = 4;
346        let quorum: u32 = 3;
347        let target_index = 200; // Target multiple rounds of signing
348        let max_shutdowns = 10; // Maximum number of shutdowns per validator
349        let min_shutdowns = 4; // Minimum number of shutdowns per validator
350        let shutdown_range_min = Duration::from_millis(100);
351        let shutdown_range_max = Duration::from_millis(1_000);
352
353        // Must be shorter than the maximum shutdown range to make progress after restarting
354        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
355
356        let mut prev_ctx = None;
357        let shutdown_counts = Arc::new(Mutex::new(HashMap::<PublicKey, u32>::new()));
358        let completed_validators = Arc::new(Mutex::new(std::collections::HashSet::new()));
359        let all_validators = Arc::new(Mutex::new(Vec::new()));
360
361        // Generate shares once
362        let mut rng = StdRng::seed_from_u64(0);
363        let (polynomial, mut shares_vec) =
364            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
365        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
366
367        // Continue until all validators reach target or max shutdowns exceeded
368        while completed_validators.lock().unwrap().len() < num_validators as usize
369            && shutdown_counts.lock().unwrap().values().max().unwrap_or(&0) < &max_shutdowns
370        {
371            let completed_clone = completed_validators.clone();
372            let shutdown_counts_clone = shutdown_counts.clone();
373            let all_validators_clone = all_validators.clone();
374            let shares_vec_clone = shares_vec.clone();
375            let polynomial_clone = polynomial.clone();
376
377            let f = move |mut context: Context| {
378                let completed = completed_clone;
379                let shutdown_counts = shutdown_counts_clone;
380                let all_validators = all_validators_clone;
381                let mut shares_vec = shares_vec_clone;
382                let polynomial = polynomial_clone;
383                async move {
384                    let (oracle, validators, pks, mut registrations) = initialize_simulation(
385                        context.with_label("simulation"),
386                        num_validators,
387                        &mut shares_vec,
388                        RELIABLE_LINK,
389                    )
390                    .await;
391                    // Store all validator public keys if not already done
392                    if all_validators.lock().unwrap().is_empty() {
393                        let mut pks_lock = all_validators.lock().unwrap();
394                        *pks_lock = pks.clone();
395                    }
396                    let automatons =
397                        Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
398                    let mut reporters =
399                        BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
400
401                    // Use unique journal partitions for each validator to enable restart recovery
402                    let mut engine_monitors = HashMap::new();
403                    let namespace = b"my testing namespace";
404                    for (validator, _scheme, share) in validators.iter() {
405                        let validator_context = context.with_label(&validator.to_string());
406                        let monitor = mocks::Monitor::new(111);
407                        engine_monitors.insert(validator.clone(), monitor.clone());
408                        let supervisor = {
409                            let mut s = mocks::Supervisor::<PublicKey, V>::default();
410                            s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
411                            s
412                        };
413
414                        let blocker = oracle.control(validator.clone());
415                        let automaton = mocks::Application::new(|_| false);
416                        automatons
417                            .lock()
418                            .unwrap()
419                            .insert(validator.clone(), automaton.clone());
420
421                        let (reporter, reporter_mailbox) =
422                            mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
423                        validator_context
424                            .with_label("reporter")
425                            .spawn(|_| reporter.run());
426                        reporters.insert(validator.clone(), reporter_mailbox);
427
428                        let engine = Engine::new(
429                            validator_context.with_label("engine"),
430                            Config {
431                                monitor,
432                                validators: supervisor,
433                                automaton,
434                                reporter: reporters.get(validator).unwrap().clone(),
435                                blocker,
436                                namespace: namespace.to_vec(),
437                                priority_acks: false,
438                                rebroadcast_timeout,
439                                epoch_bounds: (1, 1),
440                                window: std::num::NonZeroU64::new(10).unwrap(),
441                                // Use validator-specific partition for journal recovery
442                                journal_partition: format!("unclean_shutdown_test/{validator}"),
443                                journal_write_buffer: NZUsize!(4096),
444                                journal_replay_buffer: NZUsize!(4096),
445                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
446                                journal_compression: Some(3),
447                                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
448                            },
449                        );
450
451                        let (sender, receiver) = registrations.remove(validator).unwrap();
452                        engine.start((sender, receiver));
453                    }
454
455                    // Create completion watchers for all validators
456                    let mut completion_tasks = Vec::new();
457                    for (validator_pk, mut reporter_mailbox) in reporters.clone() {
458                        let validator = validator_pk.clone();
459                        let completed_ref = completed.clone();
460                        let task = context.with_label("completion_watcher").spawn(
461                            move |context| async move {
462                                loop {
463                                    if let Some((tip_index, _epoch)) =
464                                        reporter_mailbox.get_tip().await
465                                    {
466                                        if tip_index >= target_index {
467                                            // Verify that validator has signed messages at all indices
468                                            for check_index in 0..=tip_index {
469                                                if let Some((digest, epoch)) =
470                                                    reporter_mailbox.get(check_index).await
471                                                {
472                                                    assert_eq!(
473                                                        epoch, 111,
474                                                        "Epoch should be consistent"
475                                                    );
476                                                    debug!(
477                                                        ?validator,
478                                                        check_index,
479                                                        ?digest,
480                                                        "Verified validator signed message"
481                                                    );
482                                                }
483                                            }
484                                            completed_ref.lock().unwrap().insert(validator.clone());
485                                            debug!(
486                                                ?validator,
487                                                tip_index, "Validator completed signing target"
488                                            );
489                                            break;
490                                        }
491                                    }
492                                    context.sleep(Duration::from_millis(50)).await;
493                                }
494                            },
495                        );
496                        completion_tasks.push(task);
497                    }
498
499                    // Random shutdown timing to simulate unclean shutdown
500                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
501
502                    select! {
503                        _ = context.sleep(shutdown_wait) => {
504                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
505                            // Track which validators were running when shutdown occurred
506                            let mut counts = shutdown_counts.lock().unwrap();
507                            for (pk, _) in reporters {
508                                if !completed.lock().unwrap().contains(&pk) {
509                                    *counts.entry(pk).or_insert(0) += 1;
510                                }
511                            }
512                            (false, context) // Unclean shutdown
513                        },
514                        _ = join_all(completion_tasks) => {
515                            debug!("All validators completed normally");
516                            (true, context) // Clean completion
517                        }
518                    }
519                }
520            };
521
522            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
523                let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
524                debug!(shutdown_count, "Restarting from previous context");
525                deterministic::Runner::from(prev_ctx)
526            } else {
527                debug!("Starting initial run");
528                deterministic::Runner::timed(Duration::from_secs(45))
529            }
530            .start(f);
531
532            prev_ctx = Some(context.recover());
533
534            if complete {
535                debug!("Test completed successfully");
536                break;
537            }
538
539            let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
540            debug!(
541                shutdown_count,
542                completed = completed_validators.lock().unwrap().len(),
543                "Shutdown occurred, restarting"
544            );
545        }
546
547        // Verify that all validators eventually reached the target
548        let final_completed = completed_validators.lock().unwrap().len();
549        let total_shutdowns = shutdown_counts.lock().unwrap().values().sum::<u32>();
550        assert_eq!(
551            final_completed, num_validators as usize,
552            "All validators should reach target index {target_index} despite unclean shutdowns. Only {final_completed} completed after {total_shutdowns} shutdowns"
553        );
554
555        // Verify that each validator experienced a minimum number of shutdowns
556        let counts = shutdown_counts.lock().unwrap();
557        for pk in all_validators.lock().unwrap().iter() {
558            let count = counts.get(pk).copied().unwrap_or(0);
559            assert!(
560                count >= min_shutdowns,
561                "Validator {pk:?} should have at least {min_shutdowns} shutdowns, but had {count}"
562            );
563        }
564
565        debug!(
566            total_shutdowns,
567            target_index, "Unclean shutdown test completed successfully"
568        );
569    }
570
571    #[test_traced]
572    fn test_unclean_shutdown() {
573        unclean_shutdown::<MinPk>();
574        unclean_shutdown::<MinSig>();
575    }
576
577    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
578        let num_validators: u32 = 4;
579        let quorum: u32 = 3;
580        let cfg = deterministic::Config::new()
581            .with_seed(seed)
582            .with_timeout(Some(Duration::from_secs(120)));
583        let runner = deterministic::Runner::new(cfg);
584
585        runner.start(|mut context| async move {
586            let (polynomial, mut shares_vec) =
587                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
588            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
589
590            // Use degraded network links with realistic conditions
591            let degraded_link = Link {
592                latency: 200.0,
593                jitter: 150.0,
594                success_rate: 0.5,
595            };
596
597            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
598                context.with_label("simulation"),
599                num_validators,
600                &mut shares_vec,
601                degraded_link,
602            )
603            .await;
604            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
605            let mut reporters =
606                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
607
608            spawn_validator_engines::<V>(
609                context.with_label("validator"),
610                polynomial.clone(),
611                &pks,
612                &validators,
613                &mut registrations,
614                &mut automatons.lock().unwrap(),
615                &mut reporters,
616                &mut oracle,
617                Duration::from_secs(2),
618                |_| false,
619            );
620
621            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
622
623            context.auditor().state()
624        })
625    }
626
627    #[test_traced]
628    fn test_slow_and_lossy_links() {
629        slow_and_lossy_links::<MinPk>(0);
630        slow_and_lossy_links::<MinSig>(0);
631    }
632
633    #[test_traced]
634    fn test_determinism() {
635        // We use slow and lossy links as the deterministic test
636        // because it is the most complex test.
637        for seed in 1..6 {
638            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
639            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
640            assert_eq!(pk_state_1, pk_state_2);
641
642            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
643            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
644            assert_eq!(sig_state_1, sig_state_2);
645
646            // Sanity check that different types can't be identical.
647            assert_ne!(pk_state_1, sig_state_1);
648        }
649    }
650
651    fn one_offline<V: Variant>() {
652        let num_validators: u32 = 5;
653        let quorum: u32 = 3;
654        let runner = deterministic::Runner::timed(Duration::from_secs(30));
655
656        runner.start(|mut context| async move {
657            let (polynomial, mut shares_vec) =
658                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
659            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
660
661            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
662                context.with_label("simulation"),
663                num_validators,
664                &mut shares_vec,
665                RELIABLE_LINK,
666            )
667            .await;
668            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
669            let mut reporters =
670                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
671
672            // Start only 4 out of 5 validators (one offline)
673            let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
674            let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
675
676            spawn_validator_engines::<V>(
677                context.with_label("validator"),
678                polynomial.clone(),
679                &online_pks,
680                &online_validators,
681                &mut registrations,
682                &mut automatons.lock().unwrap(),
683                &mut reporters,
684                &mut oracle,
685                Duration::from_secs(5),
686                |_| false,
687            );
688            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
689        });
690    }
691
692    #[test_traced]
693    fn test_one_offline() {
694        one_offline::<MinPk>();
695        one_offline::<MinSig>();
696    }
697
698    /// Test that consensus can be reached starting from index 0.
699    fn consensus_from_index_zero<V: Variant>() {
700        let num_validators: u32 = 4;
701        let quorum: u32 = 3;
702        let runner = deterministic::Runner::timed(Duration::from_secs(30));
703
704        runner.start(|mut context| async move {
705            let (polynomial, mut shares_vec) =
706                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
707            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
708
709            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
710                context.with_label("simulation"),
711                num_validators,
712                &mut shares_vec,
713                RELIABLE_LINK,
714            )
715            .await;
716            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
717            let mut reporters =
718                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
719
720            spawn_validator_engines::<V>(
721                context.with_label("validator"),
722                polynomial.clone(),
723                &pks,
724                &validators,
725                &mut registrations,
726                &mut automatons.lock().unwrap(),
727                &mut reporters,
728                &mut oracle,
729                Duration::from_secs(5),
730                |_| false,
731            );
732
733            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
734        });
735    }
736
737    #[test_traced]
738    fn test_consensus_from_index_zero() {
739        consensus_from_index_zero::<MinPk>();
740        consensus_from_index_zero::<MinSig>();
741    }
742
743    /// Test consensus recovery after a network partition.
744    fn network_partition<V: Variant>() {
745        let num_validators: u32 = 4;
746        let quorum: u32 = 3;
747        let runner = deterministic::Runner::timed(Duration::from_secs(60));
748
749        runner.start(|mut context| async move {
750            let (polynomial, mut shares_vec) =
751                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
752            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
753
754            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
755                context.with_label("simulation"),
756                num_validators,
757                &mut shares_vec,
758                RELIABLE_LINK,
759            )
760            .await;
761            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
762            let mut reporters =
763                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
764
765            spawn_validator_engines::<V>(
766                context.with_label("validator"),
767                polynomial.clone(),
768                &pks,
769                &validators,
770                &mut registrations,
771                &mut automatons.lock().unwrap(),
772                &mut reporters,
773                &mut oracle,
774                Duration::from_secs(5),
775                |_| false,
776            );
777
778            for v1 in pks.iter() {
779                for v2 in pks.iter() {
780                    if v2 == v1 {
781                        continue;
782                    }
783                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
784                }
785            }
786            context.sleep(Duration::from_secs(20)).await;
787
788            let link = Link {
789                latency: 10.0,
790                jitter: 1.0,
791                success_rate: 1.0,
792            };
793            for v1 in pks.iter() {
794                for v2 in pks.iter() {
795                    if v2 == v1 {
796                        continue;
797                    }
798                    oracle
799                        .add_link(v1.clone(), v2.clone(), link.clone())
800                        .await
801                        .unwrap();
802                }
803            }
804
805            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
806        });
807    }
808
809    #[test_traced]
810    fn test_network_partition() {
811        network_partition::<MinPk>();
812        network_partition::<MinSig>();
813    }
814
815    /// Test consensus resilience to Byzantine behavior.
816    fn invalid_signature_injection<V: Variant>() {
817        let num_validators: u32 = 4;
818        let quorum: u32 = 3;
819        let runner = deterministic::Runner::timed(Duration::from_secs(30));
820
821        runner.start(|mut context| async move {
822            let (polynomial, mut shares_vec) =
823                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
824            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
825
826            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
827                context.with_label("simulation"),
828                num_validators,
829                &mut shares_vec,
830                RELIABLE_LINK,
831            )
832            .await;
833            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
834            let mut reporters =
835                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
836
837            // Simulate more realistic Byzantine behavior with pseudo-random faults
838            // Using a deterministic seed based on index for reproducible tests
839            let byzantine_fault_fn = |index: u64| -> bool {
840                use std::{
841                    collections::hash_map::DefaultHasher,
842                    hash::{Hash, Hasher},
843                };
844
845                let mut hasher = DefaultHasher::new();
846                index.hash(&mut hasher);
847                let hash_value = hasher.finish();
848
849                // Create Byzantine faults with ~5% probability using deterministic hash
850                // This simulates realistic sporadic Byzantine behavior
851                (hash_value % 100) < 5
852            };
853
854            spawn_validator_engines::<V>(
855                context.with_label("validator"),
856                polynomial.clone(),
857                &pks,
858                &validators,
859                &mut registrations,
860                &mut automatons.lock().unwrap(),
861                &mut reporters,
862                &mut oracle,
863                Duration::from_secs(5),
864                byzantine_fault_fn,
865            );
866
867            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
868        });
869    }
870
871    #[test_traced]
872    fn test_invalid_signature_injection() {
873        invalid_signature_injection::<MinPk>();
874        invalid_signature_injection::<MinSig>();
875    }
876
877    /// Test that verifies cryptographic signatures are properly validated.
878    fn cryptographic_validation<V: Variant>() {
879        let num_validators: u32 = 4;
880        let quorum: u32 = 3;
881        let runner = deterministic::Runner::timed(Duration::from_secs(30));
882
883        runner.start(|mut context| async move {
884            let (polynomial, mut shares_vec) =
885                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
886            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
887
888            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
889                context.with_label("simulation"),
890                num_validators,
891                &mut shares_vec,
892                RELIABLE_LINK,
893            )
894            .await;
895            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
896            let mut reporters =
897                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
898
899            spawn_validator_engines::<V>(
900                context.with_label("validator"),
901                polynomial.clone(),
902                &pks,
903                &validators,
904                &mut registrations,
905                &mut automatons.lock().unwrap(),
906                &mut reporters,
907                &mut oracle,
908                Duration::from_secs(5),
909                |_| false,
910            );
911
912            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
913
914            // Additional validation: verify that consensus was achieved and items are retrievable
915            // The reporter mock already validates ack signatures internally and panics on invalid ones
916            for (validator_pk, mut reporter_mailbox) in reporters {
917                let tip_result = reporter_mailbox.get_tip().await;
918                assert!(
919                    tip_result.is_some(),
920                    "Reporter for validator {validator_pk:?} should have a tip"
921                );
922
923                let (tip_index, tip_epoch) = tip_result.unwrap();
924                assert!(
925                    tip_index >= 1,
926                    "Tip should have progressed beyond initial state for validator {validator_pk:?}"
927                );
928                assert_eq!(
929                    tip_epoch, 111,
930                    "Tip epoch should match expected epoch for validator {validator_pk:?}"
931                );
932
933                // Validate that we can retrieve the digest for consensus items
934                if tip_index > 0 {
935                    let item_result = reporter_mailbox.get(tip_index - 1).await;
936                    assert!(
937                        item_result.is_some(),
938                        "Should be able to retrieve consensus item for validator {validator_pk:?}"
939                    );
940                }
941            }
942        });
943    }
944
945    #[test_traced]
946    fn test_cryptographic_validation() {
947        cryptographic_validation::<MinPk>();
948        cryptographic_validation::<MinSig>();
949    }
950
951    /// Test various types of Byzantine fault patterns to ensure robustness.
952    fn advanced_byzantine_faults<V: Variant>() {
953        let num_validators: u32 = 7; // Larger set to test more fault combinations
954        let quorum: u32 = 5; // Can tolerate up to 2 Byzantine validators
955        let runner = deterministic::Runner::timed(Duration::from_secs(45));
956
957        runner.start(|mut context| async move {
958            let (polynomial, mut shares_vec) =
959                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
960            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
961
962            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
963                context.with_label("simulation"),
964                num_validators,
965                &mut shares_vec,
966                RELIABLE_LINK,
967            )
968            .await;
969            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
970            let mut reporters =
971                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
972
973            // More sophisticated Byzantine fault patterns
974            let advanced_byzantine_fn = |index: u64| -> bool {
975                use std::{
976                    collections::hash_map::DefaultHasher,
977                    hash::{Hash, Hasher},
978                };
979
980                let mut hasher = DefaultHasher::new();
981                index.hash(&mut hasher);
982                let hash_value = hasher.finish();
983
984                match index % 11 {
985                    // Use prime number for less predictable pattern
986                    // Occasional random faults (~8% of the time)
987                    0..=2 if (hash_value % 100) < 8 => true,
988                    // Burst faults: consecutive failures
989                    3..=5 if index > 10 && index < 15 => true,
990                    // Periodic but irregular faults
991                    7 if (hash_value % 13) == 0 => true,
992                    _ => false,
993                }
994            };
995
996            spawn_validator_engines::<V>(
997                context.with_label("validator"),
998                polynomial.clone(),
999                &pks,
1000                &validators,
1001                &mut registrations,
1002                &mut automatons.lock().unwrap(),
1003                &mut reporters,
1004                &mut oracle,
1005                Duration::from_secs(8), // Longer timeout for more complex scenarios
1006                advanced_byzantine_fn,
1007            );
1008
1009            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1010        });
1011    }
1012
1013    #[test_traced]
1014    fn test_advanced_byzantine_faults() {
1015        advanced_byzantine_faults::<MinPk>();
1016        advanced_byzantine_faults::<MinSig>();
1017    }
1018
1019    /// Test insufficient validator participation (below quorum).
1020    fn insufficient_validators<V: Variant>() {
1021        let num_validators: u32 = 5;
1022        let quorum: u32 = 3;
1023        let runner = deterministic::Runner::timed(Duration::from_secs(15));
1024
1025        runner.start(|mut context| async move {
1026            let (polynomial, mut shares_vec) =
1027                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1028            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1029
1030            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1031                context.with_label("simulation"),
1032                num_validators,
1033                &mut shares_vec,
1034                RELIABLE_LINK,
1035            )
1036            .await;
1037            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1038            let mut reporters =
1039                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1040
1041            // Start only 2 out of 5 validators (below quorum of 3)
1042            let insufficient_validators: Vec<_> = validators.iter().take(2).cloned().collect();
1043            let insufficient_pks: Vec<_> = pks.iter().take(2).cloned().collect();
1044
1045            spawn_validator_engines::<V>(
1046                context.with_label("validator"),
1047                polynomial.clone(),
1048                &insufficient_pks,
1049                &insufficient_validators,
1050                &mut registrations,
1051                &mut automatons.lock().unwrap(),
1052                &mut reporters,
1053                &mut oracle,
1054                Duration::from_secs(3),
1055                |_| false,
1056            );
1057
1058            // With insufficient validators, consensus should not be achievable
1059            // Wait long enough for any potential consensus attempts to complete
1060            context.sleep(Duration::from_secs(12)).await;
1061
1062            // Check that no validator achieved consensus through verified threshold signatures
1063            let mut any_consensus = false;
1064            for (validator_pk, mut reporter_mailbox) in reporters {
1065                // The reporter only advances contiguous_tip when valid threshold signatures are received
1066                // and cryptographically verified. A contiguous_tip > 0 means at least one threshold
1067                // signature was successfully created and validated, which should be impossible
1068                // with insufficient validators (below quorum).
1069                let contiguous_tip = reporter_mailbox.get_contiguous_tip().await.unwrap_or(0);
1070                if contiguous_tip > 0 {
1071                    any_consensus = true;
1072                    tracing::warn!(
1073                        ?validator_pk,
1074                        contiguous_tip,
1075                        "Unexpected threshold signature consensus with insufficient validators"
1076                    );
1077                }
1078            }
1079
1080            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1081            assert!(
1082                !any_consensus,
1083                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1084            );
1085        });
1086    }
1087
1088    #[test_traced]
1089    fn test_insufficient_validators() {
1090        insufficient_validators::<MinPk>();
1091        insufficient_validators::<MinSig>();
1092    }
1093
1094    /// Test that verifies threshold signatures are mathematically correct and properly formed.
1095    fn threshold_signature_correctness<V: Variant>() {
1096        let num_validators: u32 = 4;
1097        let quorum: u32 = 3;
1098        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1099
1100        runner.start(|mut context| async move {
1101            let (polynomial, mut shares_vec) =
1102                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1103            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1104
1105            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1106                context.with_label("simulation"),
1107                num_validators,
1108                &mut shares_vec,
1109                RELIABLE_LINK,
1110            )
1111            .await;
1112            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1113            let mut reporters =
1114                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1115
1116            spawn_validator_engines::<V>(
1117                context.with_label("validator"),
1118                polynomial.clone(),
1119                &pks,
1120                &validators,
1121                &mut registrations,
1122                &mut automatons.lock().unwrap(),
1123                &mut reporters,
1124                &mut oracle,
1125                Duration::from_secs(5),
1126                |_| false, // No Byzantine faults for this test
1127            );
1128
1129            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1130
1131            // Now verify that all consensus items have mathematically valid threshold signatures
1132            for (validator_pk, mut reporter_mailbox) in reporters {
1133                let tip_result = reporter_mailbox.get_tip().await;
1134                assert!(
1135                    tip_result.is_some(),
1136                    "Reporter should have achieved consensus"
1137                );
1138
1139                let (tip_index, _) = tip_result.unwrap();
1140
1141                // Check each consensus item up to the tip (starting from 1, as 0 might be genesis)
1142                for index in 1..=tip_index {
1143                    let item_result = reporter_mailbox.get(index).await;
1144                    assert!(
1145                        item_result.is_some(),
1146                        "Should have consensus item at index {index}"
1147                    );
1148
1149                    let (digest, epoch) = item_result.unwrap();
1150
1151                    // Manually verify that this item would have a valid threshold signature
1152                    // by constructing the expected item and checking signature validity
1153                    let _item = super::types::Item { index, digest };
1154                    let mut ack_namespace = b"my testing namespace".to_vec();
1155                    ack_namespace.extend_from_slice(b"_AGG_ACK");
1156
1157                    // We can't directly access the threshold signature from the reporter,
1158                    // but we can verify that the reporter's acceptance implies valid crypto.
1159                    // The reporter mock now validates threshold signatures and would panic
1160                    // if they were invalid, so reaching this point means they were valid.
1161
1162                    tracing::debug!(
1163                        ?validator_pk,
1164                        index,
1165                        epoch,
1166                        "Verified valid threshold signature for consensus item"
1167                    );
1168                }
1169            }
1170        });
1171    }
1172
1173    #[test_traced]
1174    fn test_threshold_signature_correctness() {
1175        threshold_signature_correctness::<MinPk>();
1176        threshold_signature_correctness::<MinSig>();
1177    }
1178
1179    /// Test that manually constructs and verifies threshold signatures.
1180    fn manual_threshold_verification<V: Variant>() {
1181        let runner = deterministic::Runner::timed(Duration::from_secs(10));
1182
1183        runner.start(|mut context| async move {
1184            let num_validators = 4u32;
1185            let quorum = 3u32;
1186
1187            // Generate threshold cryptography setup
1188            let (polynomial, shares_vec) =
1189                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1190
1191            // Create a test item to sign
1192            let test_item = super::types::Item {
1193                index: 42,
1194                digest: Sha256Digest::from([1u8; 32]),
1195            };
1196
1197            let namespace = b"test_namespace";
1198            let ack_namespace = [namespace.as_slice(), b"_AGG_ACK"].concat();
1199
1200            // Generate partial signatures from sufficient validators (quorum=3)
1201            let mut partial_sigs = Vec::new();
1202            for share in shares_vec.iter().take(quorum as usize) {
1203                let partial_sig = bls_ops::partial_sign_message::<V>(
1204                    share,
1205                    Some(&ack_namespace),
1206                    &test_item.encode(),
1207                );
1208                partial_sigs.push(partial_sig);
1209            }
1210
1211            // Recover partial signatures into threshold signature
1212            let threshold_sig = poly::Signature::<V>::recover(quorum, &partial_sigs).expect(
1213                "Should be able to recover threshold signature from sufficient partial signatures",
1214            );
1215
1216            // Verify the threshold signature
1217            let threshold_public = poly::public::<V>(&polynomial);
1218            let verification_result = bls_ops::verify_message::<V>(
1219                threshold_public,
1220                Some(&ack_namespace),
1221                &test_item.encode(),
1222                &threshold_sig,
1223            );
1224
1225            assert!(
1226                verification_result.is_ok(),
1227                "Manually constructed threshold signature should be valid: {:?}",
1228                verification_result.err()
1229            );
1230
1231            // Test with insufficient signatures (should fail)
1232            let insufficient_partial_sigs: Vec<_> = partial_sigs
1233                .iter()
1234                .take(quorum as usize - 1)
1235                .cloned()
1236                .collect();
1237            let insufficient_result =
1238                poly::Signature::<V>::recover(quorum, &insufficient_partial_sigs);
1239
1240            assert!(
1241                insufficient_result.is_err(),
1242                "Should not be able to recover threshold signature with insufficient partial signatures"
1243            );
1244
1245            tracing::debug!("Manual threshold signature verification completed successfully");
1246        });
1247    }
1248
1249    #[test_traced]
1250    fn test_manual_threshold_verification() {
1251        manual_threshold_verification::<MinPk>();
1252        manual_threshold_verification::<MinSig>();
1253    }
1254}