commonware_consensus/aggregation/
mod.rs

1//! Recover threshold signatures over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce threshold signatures
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect threshold signatures over concurrent, sequencer-driven broadcast,
12//! _check out [crate::ordered_broadcast]._
13//!
14//! # Architecture
15//!
16//! The core of the module is the [Engine]. It manages the agreement process by:
17//! - Requesting externally synchronized [commonware_cryptography::Digest]s
18//! - Signing said digests with BLS [commonware_cryptography::bls12381::primitives::poly::PartialSignature]
19//! - Multicasting partial signatures to other validators
20//! - Recovering [commonware_cryptography::bls12381::primitives::poly::Signature]s from a quorum of partial signatures
21//! - Monitoring recovery progress and notifying the application layer of recoveries
22//!
23//! The engine interacts with four main components:
24//! - [crate::Automaton]: Provides external digests
25//! - [crate::Reporter]: Receives agreement confirmations
26//! - [crate::Monitor]: Tracks epoch transitions
27//! - [crate::ThresholdSupervisor]: Manages validator sets and network identities
28//!
29//! # Design Decisions
30//!
31//! ## Missing Signature Resolution
32//!
33//! The engine does not try to "fill gaps" when missing threshold signatures. When validators
34//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
35//! signatures may never be emitted by the local engine. Before skipping ahead, we ensure that
36//! at-least-one honest validator has the threshold signature for any skipped index. This design
37//! is intentional to prioritize the creation of threshold signatures as fast as possible. By
38//! advancing the tip, honest validators can continue producing threshold signatures for new
39//! indices rather than getting stuck trying to backfill missing signatures. Validators who are
40//! online and honest can maintain consensus even when others fall behind or go offline.
41//! Backfilling missing signatures is left to other parts of the application that can implement
42//! appropriate recovery strategies.
43
44pub mod types;
45
46cfg_if::cfg_if! {
47    if #[cfg(not(target_arch = "wasm32"))] {
48        mod config;
49        pub use config::Config;
50        mod engine;
51        pub use engine::Engine;
52        mod metrics;
53        mod safe_tip;
54
55        #[cfg(test)]
56        pub mod mocks;
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::{mocks, types::Epoch, Config, Engine};
63    use commonware_codec::Encode;
64    use commonware_cryptography::{
65        bls12381::{
66            dkg::ops,
67            primitives::{
68                group::Share,
69                ops as bls_ops, poly,
70                variant::{MinPk, MinSig, Variant},
71            },
72        },
73        ed25519::{PrivateKey, PublicKey},
74        sha256::Digest as Sha256Digest,
75        PrivateKeyExt as _, Signer as _,
76    };
77    use commonware_macros::{select, test_traced};
78    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
79    use commonware_runtime::{
80        deterministic::{self, Context},
81        Clock, Metrics, Runner, Spawner,
82    };
83    use commonware_utils::NonZeroDuration;
84    use futures::{channel::oneshot, future::join_all};
85    use rand::{rngs::StdRng, Rng, SeedableRng};
86    use std::{
87        collections::{BTreeMap, HashMap},
88        sync::{Arc, Mutex},
89        time::Duration,
90    };
91    use tracing::debug;
92
93    type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
94
95    /// Reliable network link configuration for testing.
96    const RELIABLE_LINK: Link = Link {
97        latency: 10.0,
98        jitter: 1.0,
99        success_rate: 1.0,
100    };
101
102    /// Register all participants with the network oracle.
103    async fn register_participants(
104        oracle: &mut Oracle<PublicKey>,
105        participants: &[PublicKey],
106    ) -> Registrations<PublicKey> {
107        let mut registrations = BTreeMap::new();
108        for participant in participants.iter() {
109            let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
110            registrations.insert(participant.clone(), (sender, receiver));
111        }
112        registrations
113    }
114
115    /// Establish network links between all participants.
116    async fn link_participants(
117        oracle: &mut Oracle<PublicKey>,
118        participants: &[PublicKey],
119        link: Link,
120    ) {
121        for v1 in participants.iter() {
122            for v2 in participants.iter() {
123                if v2 == v1 {
124                    continue;
125                }
126                oracle
127                    .add_link(v1.clone(), v2.clone(), link.clone())
128                    .await
129                    .unwrap();
130            }
131        }
132    }
133
134    /// Initialize a simulated network environment.
135    async fn initialize_simulation(
136        context: Context,
137        num_validators: u32,
138        shares_vec: &mut [Share],
139        link: Link,
140    ) -> (
141        Oracle<PublicKey>,
142        Vec<(PublicKey, PrivateKey, Share)>,
143        Vec<PublicKey>,
144        Registrations<PublicKey>,
145    ) {
146        let (network, mut oracle) = Network::new(
147            context.with_label("network"),
148            commonware_p2p::simulated::Config {
149                max_size: 1024 * 1024,
150            },
151        );
152        network.start();
153
154        let mut schemes = (0..num_validators)
155            .map(|i| PrivateKey::from_seed(i as u64))
156            .collect::<Vec<_>>();
157        schemes.sort_by_key(|s| s.public_key());
158        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
159            .iter()
160            .enumerate()
161            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
162            .collect();
163        let pks = validators
164            .iter()
165            .map(|(pk, _, _)| pk.clone())
166            .collect::<Vec<_>>();
167
168        let registrations = register_participants(&mut oracle, &pks).await;
169        link_participants(&mut oracle, &pks, link).await;
170        (oracle, validators, pks, registrations)
171    }
172
173    /// Spawn aggregation engines for all validators.
174    #[allow(clippy::too_many_arguments)]
175    fn spawn_validator_engines<V: Variant>(
176        context: Context,
177        polynomial: poly::Public<V>,
178        validator_pks: &[PublicKey],
179        validators: &[(PublicKey, PrivateKey, Share)],
180        registrations: &mut Registrations<PublicKey>,
181        automatons: &mut BTreeMap<PublicKey, mocks::Application>,
182        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
183        oracle: &mut Oracle<PublicKey>,
184        rebroadcast_timeout: Duration,
185        invalid_when: fn(u64) -> bool,
186    ) -> HashMap<PublicKey, mocks::Monitor> {
187        let mut monitors = HashMap::new();
188        let namespace = b"my testing namespace";
189        for (validator, _scheme, share) in validators.iter() {
190            let context = context.with_label(&validator.to_string());
191            let monitor = mocks::Monitor::new(111);
192            monitors.insert(validator.clone(), monitor.clone());
193            let supervisor = {
194                let mut s = mocks::Supervisor::<PublicKey, V>::default();
195                s.add_epoch(
196                    111,
197                    share.clone(),
198                    polynomial.clone(),
199                    validator_pks.to_vec(),
200                );
201                s
202            };
203
204            let blocker = oracle.control(validator.clone());
205
206            let automaton = mocks::Application::new(invalid_when);
207            automatons.insert(validator.clone(), automaton.clone());
208
209            let (reporter, reporter_mailbox) =
210                mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
211            context.with_label("reporter").spawn(|_| reporter.run());
212            reporters.insert(validator.clone(), reporter_mailbox);
213
214            let engine = Engine::new(
215                context.with_label("engine"),
216                Config {
217                    monitor,
218                    validators: supervisor,
219                    automaton: automaton.clone(),
220                    reporter: reporters.get(validator).unwrap().clone(),
221                    blocker,
222                    namespace: namespace.to_vec(),
223                    priority_acks: false,
224                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
225                    epoch_bounds: (1, 1),
226                    window: std::num::NonZeroU64::new(10).unwrap(),
227                    journal_partition: format!("aggregation/{validator}"),
228                    journal_write_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
229                    journal_replay_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
230                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
231                    journal_compression: Some(3),
232                },
233            );
234
235            let (sender, receiver) = registrations.remove(validator).unwrap();
236            engine.start((sender, receiver));
237        }
238        monitors
239    }
240
241    /// Wait for all reporters to reach the specified consensus threshold.
242    async fn await_reporters<V: Variant>(
243        context: Context,
244        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
245        threshold_index: u64,
246        threshold_epoch: Epoch,
247    ) {
248        let mut receivers = Vec::new();
249        for (reporter, mailbox) in reporters.iter() {
250            // Create a oneshot channel to signal when the reporter has reached the threshold.
251            let (tx, rx) = oneshot::channel();
252            receivers.push(rx);
253
254            context.with_label("reporter_watcher").spawn({
255                let reporter = reporter.clone();
256                let mut mailbox = mailbox.clone();
257                move |context| async move {
258                    loop {
259                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
260                        let contiguous_index = mailbox.get_contiguous_tip().await.unwrap_or(0);
261                        debug!(
262                            index,
263                            epoch,
264                            contiguous_index,
265                            threshold_index,
266                            threshold_epoch,
267                            ?reporter,
268                            "reporter status"
269                        );
270                        if contiguous_index >= threshold_index && epoch >= threshold_epoch {
271                            debug!(
272                                ?reporter,
273                                "reporter reached threshold, signaling completion"
274                            );
275                            let _ = tx.send(reporter.clone());
276                            break;
277                        }
278                        context.sleep(Duration::from_millis(100)).await;
279                    }
280                }
281            });
282        }
283
284        // Wait for all oneshot receivers to complete.
285        let results = join_all(receivers).await;
286        assert_eq!(results.len(), reporters.len());
287
288        // Check that none were cancelled.
289        for result in results {
290            assert!(result.is_ok(), "reporter was cancelled");
291        }
292    }
293
294    /// Test aggregation consensus with all validators online.
295    fn all_online<V: Variant>() {
296        let num_validators: u32 = 4;
297        let quorum: u32 = 3;
298        let runner = deterministic::Runner::timed(Duration::from_secs(30));
299
300        runner.start(|mut context| async move {
301            let (polynomial, mut shares_vec) =
302                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
303            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
304
305            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
306                context.with_label("simulation"),
307                num_validators,
308                &mut shares_vec,
309                RELIABLE_LINK,
310            )
311            .await;
312            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
313            let mut reporters =
314                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
315            spawn_validator_engines::<V>(
316                context.with_label("validator"),
317                polynomial.clone(),
318                &pks,
319                &validators,
320                &mut registrations,
321                &mut automatons.lock().unwrap(),
322                &mut reporters,
323                &mut oracle,
324                Duration::from_secs(5),
325                |_| false,
326            );
327            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
328        });
329    }
330
331    #[test_traced]
332    fn test_all_online() {
333        all_online::<MinPk>();
334        all_online::<MinSig>();
335    }
336
337    fn unclean_shutdown<V: Variant>() {
338        // Test parameters
339        let num_validators: u32 = 4;
340        let quorum: u32 = 3;
341        let target_index = 200; // Target multiple rounds of signing
342        let max_shutdowns = 10; // Maximum number of shutdowns per validator
343        let min_shutdowns = 4; // Minimum number of shutdowns per validator
344        let shutdown_range_min = Duration::from_millis(100);
345        let shutdown_range_max = Duration::from_millis(1_000);
346
347        // Must be shorter than the maximum shutdown range to make progress after restarting
348        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
349
350        let mut prev_ctx = None;
351        let shutdown_counts = Arc::new(Mutex::new(HashMap::<PublicKey, u32>::new()));
352        let completed_validators = Arc::new(Mutex::new(std::collections::HashSet::new()));
353        let all_validators = Arc::new(Mutex::new(Vec::new()));
354
355        // Generate shares once
356        let mut rng = StdRng::seed_from_u64(0);
357        let (polynomial, mut shares_vec) =
358            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
359        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
360
361        // Continue until all validators reach target or max shutdowns exceeded
362        while completed_validators.lock().unwrap().len() < num_validators as usize
363            && shutdown_counts.lock().unwrap().values().max().unwrap_or(&0) < &max_shutdowns
364        {
365            let completed_clone = completed_validators.clone();
366            let shutdown_counts_clone = shutdown_counts.clone();
367            let all_validators_clone = all_validators.clone();
368            let shares_vec_clone = shares_vec.clone();
369            let polynomial_clone = polynomial.clone();
370
371            let f = move |mut context: Context| {
372                let completed = completed_clone;
373                let shutdown_counts = shutdown_counts_clone;
374                let all_validators = all_validators_clone;
375                let mut shares_vec = shares_vec_clone;
376                let polynomial = polynomial_clone;
377                async move {
378                    let (oracle, validators, pks, mut registrations) = initialize_simulation(
379                        context.with_label("simulation"),
380                        num_validators,
381                        &mut shares_vec,
382                        RELIABLE_LINK,
383                    )
384                    .await;
385                    // Store all validator public keys if not already done
386                    if all_validators.lock().unwrap().is_empty() {
387                        let mut pks_lock = all_validators.lock().unwrap();
388                        *pks_lock = pks.clone();
389                    }
390                    let automatons =
391                        Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
392                    let mut reporters =
393                        BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
394
395                    // Use unique journal partitions for each validator to enable restart recovery
396                    let mut engine_monitors = HashMap::new();
397                    let namespace = b"my testing namespace";
398                    for (validator, _scheme, share) in validators.iter() {
399                        let validator_context = context.with_label(&validator.to_string());
400                        let monitor = mocks::Monitor::new(111);
401                        engine_monitors.insert(validator.clone(), monitor.clone());
402                        let supervisor = {
403                            let mut s = mocks::Supervisor::<PublicKey, V>::default();
404                            s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
405                            s
406                        };
407
408                        let blocker = oracle.control(validator.clone());
409                        let automaton = mocks::Application::new(|_| false);
410                        automatons
411                            .lock()
412                            .unwrap()
413                            .insert(validator.clone(), automaton.clone());
414
415                        let (reporter, reporter_mailbox) =
416                            mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
417                        validator_context
418                            .with_label("reporter")
419                            .spawn(|_| reporter.run());
420                        reporters.insert(validator.clone(), reporter_mailbox);
421
422                        let engine = Engine::new(
423                            validator_context.with_label("engine"),
424                            Config {
425                                monitor,
426                                validators: supervisor,
427                                automaton,
428                                reporter: reporters.get(validator).unwrap().clone(),
429                                blocker,
430                                namespace: namespace.to_vec(),
431                                priority_acks: false,
432                                rebroadcast_timeout,
433                                epoch_bounds: (1, 1),
434                                window: std::num::NonZeroU64::new(10).unwrap(),
435                                // Use validator-specific partition for journal recovery
436                                journal_partition: format!("unclean_shutdown_test/{validator}"),
437                                journal_write_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
438                                journal_replay_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
439                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
440                                journal_compression: Some(3),
441                            },
442                        );
443
444                        let (sender, receiver) = registrations.remove(validator).unwrap();
445                        engine.start((sender, receiver));
446                    }
447
448                    // Create completion watchers for all validators
449                    let mut completion_tasks = Vec::new();
450                    for (validator_pk, mut reporter_mailbox) in reporters.clone() {
451                        let validator = validator_pk.clone();
452                        let completed_ref = completed.clone();
453                        let task = context.with_label("completion_watcher").spawn(
454                            move |context| async move {
455                                loop {
456                                    if let Some((tip_index, _epoch)) =
457                                        reporter_mailbox.get_tip().await
458                                    {
459                                        if tip_index >= target_index {
460                                            // Verify that validator has signed messages at all indices
461                                            for check_index in 0..=tip_index {
462                                                if let Some((digest, epoch)) =
463                                                    reporter_mailbox.get(check_index).await
464                                                {
465                                                    assert_eq!(
466                                                        epoch, 111,
467                                                        "Epoch should be consistent"
468                                                    );
469                                                    debug!(
470                                                        ?validator,
471                                                        check_index,
472                                                        ?digest,
473                                                        "Verified validator signed message"
474                                                    );
475                                                }
476                                            }
477                                            completed_ref.lock().unwrap().insert(validator.clone());
478                                            debug!(
479                                                ?validator,
480                                                tip_index, "Validator completed signing target"
481                                            );
482                                            break;
483                                        }
484                                    }
485                                    context.sleep(Duration::from_millis(50)).await;
486                                }
487                            },
488                        );
489                        completion_tasks.push(task);
490                    }
491
492                    // Random shutdown timing to simulate unclean shutdown
493                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
494
495                    select! {
496                        _ = context.sleep(shutdown_wait) => {
497                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
498                            // Track which validators were running when shutdown occurred
499                            let mut counts = shutdown_counts.lock().unwrap();
500                            for (pk, _) in reporters {
501                                if !completed.lock().unwrap().contains(&pk) {
502                                    *counts.entry(pk).or_insert(0) += 1;
503                                }
504                            }
505                            (false, context) // Unclean shutdown
506                        },
507                        _ = join_all(completion_tasks) => {
508                            debug!("All validators completed normally");
509                            (true, context) // Clean completion
510                        }
511                    }
512                }
513            };
514
515            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
516                let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
517                debug!(shutdown_count, "Restarting from previous context");
518                deterministic::Runner::from(prev_ctx)
519            } else {
520                debug!("Starting initial run");
521                deterministic::Runner::timed(Duration::from_secs(45))
522            }
523            .start(f);
524
525            prev_ctx = Some(context.recover());
526
527            if complete {
528                debug!("Test completed successfully");
529                break;
530            }
531
532            let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
533            debug!(
534                shutdown_count,
535                completed = completed_validators.lock().unwrap().len(),
536                "Shutdown occurred, restarting"
537            );
538        }
539
540        // Verify that all validators eventually reached the target
541        let final_completed = completed_validators.lock().unwrap().len();
542        let total_shutdowns = shutdown_counts.lock().unwrap().values().sum::<u32>();
543        assert_eq!(
544            final_completed, num_validators as usize,
545            "All validators should reach target index {target_index} despite unclean shutdowns. Only {final_completed} completed after {total_shutdowns} shutdowns"
546        );
547
548        // Verify that each validator experienced a minimum number of shutdowns
549        let counts = shutdown_counts.lock().unwrap();
550        for pk in all_validators.lock().unwrap().iter() {
551            let count = counts.get(pk).copied().unwrap_or(0);
552            assert!(
553                count >= min_shutdowns,
554                "Validator {pk:?} should have at least {min_shutdowns} shutdowns, but had {count}"
555            );
556        }
557
558        debug!(
559            total_shutdowns,
560            target_index, "Unclean shutdown test completed successfully"
561        );
562    }
563
564    #[test_traced]
565    fn test_unclean_shutdown() {
566        unclean_shutdown::<MinPk>();
567        unclean_shutdown::<MinSig>();
568    }
569
570    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
571        let num_validators: u32 = 4;
572        let quorum: u32 = 3;
573        let cfg = deterministic::Config::new()
574            .with_seed(seed)
575            .with_timeout(Some(Duration::from_secs(120)));
576        let runner = deterministic::Runner::new(cfg);
577
578        runner.start(|mut context| async move {
579            let (polynomial, mut shares_vec) =
580                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
581            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
582
583            // Use degraded network links with realistic conditions
584            let degraded_link = Link {
585                latency: 200.0,
586                jitter: 150.0,
587                success_rate: 0.5,
588            };
589
590            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
591                context.with_label("simulation"),
592                num_validators,
593                &mut shares_vec,
594                degraded_link,
595            )
596            .await;
597            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
598            let mut reporters =
599                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
600
601            spawn_validator_engines::<V>(
602                context.with_label("validator"),
603                polynomial.clone(),
604                &pks,
605                &validators,
606                &mut registrations,
607                &mut automatons.lock().unwrap(),
608                &mut reporters,
609                &mut oracle,
610                Duration::from_secs(2),
611                |_| false,
612            );
613
614            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
615
616            context.auditor().state()
617        })
618    }
619
620    #[test_traced]
621    fn test_slow_and_lossy_links() {
622        slow_and_lossy_links::<MinPk>(0);
623        slow_and_lossy_links::<MinSig>(0);
624    }
625
626    #[test_traced]
627    fn test_determinism() {
628        // We use slow and lossy links as the deterministic test
629        // because it is the most complex test.
630        for seed in 1..6 {
631            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
632            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
633            assert_eq!(pk_state_1, pk_state_2);
634
635            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
636            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
637            assert_eq!(sig_state_1, sig_state_2);
638
639            // Sanity check that different types can't be identical.
640            assert_ne!(pk_state_1, sig_state_1);
641        }
642    }
643
644    fn one_offline<V: Variant>() {
645        let num_validators: u32 = 5;
646        let quorum: u32 = 3;
647        let runner = deterministic::Runner::timed(Duration::from_secs(30));
648
649        runner.start(|mut context| async move {
650            let (polynomial, mut shares_vec) =
651                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
652            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
653
654            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
655                context.with_label("simulation"),
656                num_validators,
657                &mut shares_vec,
658                RELIABLE_LINK,
659            )
660            .await;
661            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
662            let mut reporters =
663                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
664
665            // Start only 4 out of 5 validators (one offline)
666            let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
667            let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
668
669            spawn_validator_engines::<V>(
670                context.with_label("validator"),
671                polynomial.clone(),
672                &online_pks,
673                &online_validators,
674                &mut registrations,
675                &mut automatons.lock().unwrap(),
676                &mut reporters,
677                &mut oracle,
678                Duration::from_secs(5),
679                |_| false,
680            );
681            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
682        });
683    }
684
685    #[test_traced]
686    fn test_one_offline() {
687        one_offline::<MinPk>();
688        one_offline::<MinSig>();
689    }
690
691    /// Test that consensus can be reached starting from index 0.
692    fn consensus_from_index_zero<V: Variant>() {
693        let num_validators: u32 = 4;
694        let quorum: u32 = 3;
695        let runner = deterministic::Runner::timed(Duration::from_secs(30));
696
697        runner.start(|mut context| async move {
698            let (polynomial, mut shares_vec) =
699                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
700            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
701
702            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
703                context.with_label("simulation"),
704                num_validators,
705                &mut shares_vec,
706                RELIABLE_LINK,
707            )
708            .await;
709            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
710            let mut reporters =
711                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
712
713            spawn_validator_engines::<V>(
714                context.with_label("validator"),
715                polynomial.clone(),
716                &pks,
717                &validators,
718                &mut registrations,
719                &mut automatons.lock().unwrap(),
720                &mut reporters,
721                &mut oracle,
722                Duration::from_secs(5),
723                |_| false,
724            );
725
726            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
727        });
728    }
729
730    #[test_traced]
731    fn test_consensus_from_index_zero() {
732        consensus_from_index_zero::<MinPk>();
733        consensus_from_index_zero::<MinSig>();
734    }
735
736    /// Test consensus recovery after a network partition.
737    fn network_partition<V: Variant>() {
738        let num_validators: u32 = 4;
739        let quorum: u32 = 3;
740        let runner = deterministic::Runner::timed(Duration::from_secs(60));
741
742        runner.start(|mut context| async move {
743            let (polynomial, mut shares_vec) =
744                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
745            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
746
747            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
748                context.with_label("simulation"),
749                num_validators,
750                &mut shares_vec,
751                RELIABLE_LINK,
752            )
753            .await;
754            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
755            let mut reporters =
756                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
757
758            spawn_validator_engines::<V>(
759                context.with_label("validator"),
760                polynomial.clone(),
761                &pks,
762                &validators,
763                &mut registrations,
764                &mut automatons.lock().unwrap(),
765                &mut reporters,
766                &mut oracle,
767                Duration::from_secs(5),
768                |_| false,
769            );
770
771            for v1 in pks.iter() {
772                for v2 in pks.iter() {
773                    if v2 == v1 {
774                        continue;
775                    }
776                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
777                }
778            }
779            context.sleep(Duration::from_secs(20)).await;
780
781            let link = Link {
782                latency: 10.0,
783                jitter: 1.0,
784                success_rate: 1.0,
785            };
786            for v1 in pks.iter() {
787                for v2 in pks.iter() {
788                    if v2 == v1 {
789                        continue;
790                    }
791                    oracle
792                        .add_link(v1.clone(), v2.clone(), link.clone())
793                        .await
794                        .unwrap();
795                }
796            }
797
798            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
799        });
800    }
801
802    #[test_traced]
803    fn test_network_partition() {
804        network_partition::<MinPk>();
805        network_partition::<MinSig>();
806    }
807
808    /// Test consensus resilience to Byzantine behavior.
809    fn invalid_signature_injection<V: Variant>() {
810        let num_validators: u32 = 4;
811        let quorum: u32 = 3;
812        let runner = deterministic::Runner::timed(Duration::from_secs(30));
813
814        runner.start(|mut context| async move {
815            let (polynomial, mut shares_vec) =
816                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
817            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
818
819            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
820                context.with_label("simulation"),
821                num_validators,
822                &mut shares_vec,
823                RELIABLE_LINK,
824            )
825            .await;
826            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
827            let mut reporters =
828                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
829
830            // Simulate more realistic Byzantine behavior with pseudo-random faults
831            // Using a deterministic seed based on index for reproducible tests
832            let byzantine_fault_fn = |index: u64| -> bool {
833                use std::{
834                    collections::hash_map::DefaultHasher,
835                    hash::{Hash, Hasher},
836                };
837
838                let mut hasher = DefaultHasher::new();
839                index.hash(&mut hasher);
840                let hash_value = hasher.finish();
841
842                // Create Byzantine faults with ~5% probability using deterministic hash
843                // This simulates realistic sporadic Byzantine behavior
844                (hash_value % 100) < 5
845            };
846
847            spawn_validator_engines::<V>(
848                context.with_label("validator"),
849                polynomial.clone(),
850                &pks,
851                &validators,
852                &mut registrations,
853                &mut automatons.lock().unwrap(),
854                &mut reporters,
855                &mut oracle,
856                Duration::from_secs(5),
857                byzantine_fault_fn,
858            );
859
860            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
861        });
862    }
863
864    #[test_traced]
865    fn test_invalid_signature_injection() {
866        invalid_signature_injection::<MinPk>();
867        invalid_signature_injection::<MinSig>();
868    }
869
870    /// Test that verifies cryptographic signatures are properly validated.
871    fn cryptographic_validation<V: Variant>() {
872        let num_validators: u32 = 4;
873        let quorum: u32 = 3;
874        let runner = deterministic::Runner::timed(Duration::from_secs(30));
875
876        runner.start(|mut context| async move {
877            let (polynomial, mut shares_vec) =
878                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
879            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
880
881            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
882                context.with_label("simulation"),
883                num_validators,
884                &mut shares_vec,
885                RELIABLE_LINK,
886            )
887            .await;
888            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
889            let mut reporters =
890                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
891
892            spawn_validator_engines::<V>(
893                context.with_label("validator"),
894                polynomial.clone(),
895                &pks,
896                &validators,
897                &mut registrations,
898                &mut automatons.lock().unwrap(),
899                &mut reporters,
900                &mut oracle,
901                Duration::from_secs(5),
902                |_| false,
903            );
904
905            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
906
907            // Additional validation: verify that consensus was achieved and items are retrievable
908            // The reporter mock already validates ack signatures internally and panics on invalid ones
909            for (validator_pk, mut reporter_mailbox) in reporters {
910                let tip_result = reporter_mailbox.get_tip().await;
911                assert!(
912                    tip_result.is_some(),
913                    "Reporter for validator {validator_pk:?} should have a tip"
914                );
915
916                let (tip_index, tip_epoch) = tip_result.unwrap();
917                assert!(
918                    tip_index >= 1,
919                    "Tip should have progressed beyond initial state for validator {validator_pk:?}"
920                );
921                assert_eq!(
922                    tip_epoch, 111,
923                    "Tip epoch should match expected epoch for validator {validator_pk:?}"
924                );
925
926                // Validate that we can retrieve the digest for consensus items
927                if tip_index > 0 {
928                    let item_result = reporter_mailbox.get(tip_index - 1).await;
929                    assert!(
930                        item_result.is_some(),
931                        "Should be able to retrieve consensus item for validator {validator_pk:?}"
932                    );
933                }
934            }
935        });
936    }
937
938    #[test_traced]
939    fn test_cryptographic_validation() {
940        cryptographic_validation::<MinPk>();
941        cryptographic_validation::<MinSig>();
942    }
943
944    /// Test various types of Byzantine fault patterns to ensure robustness.
945    fn advanced_byzantine_faults<V: Variant>() {
946        let num_validators: u32 = 7; // Larger set to test more fault combinations
947        let quorum: u32 = 5; // Can tolerate up to 2 Byzantine validators
948        let runner = deterministic::Runner::timed(Duration::from_secs(45));
949
950        runner.start(|mut context| async move {
951            let (polynomial, mut shares_vec) =
952                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
953            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
954
955            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
956                context.with_label("simulation"),
957                num_validators,
958                &mut shares_vec,
959                RELIABLE_LINK,
960            )
961            .await;
962            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
963            let mut reporters =
964                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
965
966            // More sophisticated Byzantine fault patterns
967            let advanced_byzantine_fn = |index: u64| -> bool {
968                use std::{
969                    collections::hash_map::DefaultHasher,
970                    hash::{Hash, Hasher},
971                };
972
973                let mut hasher = DefaultHasher::new();
974                index.hash(&mut hasher);
975                let hash_value = hasher.finish();
976
977                match index % 11 {
978                    // Use prime number for less predictable pattern
979                    // Occasional random faults (~8% of the time)
980                    0..=2 if (hash_value % 100) < 8 => true,
981                    // Burst faults: consecutive failures
982                    3..=5 if index > 10 && index < 15 => true,
983                    // Periodic but irregular faults
984                    7 if (hash_value % 13) == 0 => true,
985                    _ => false,
986                }
987            };
988
989            spawn_validator_engines::<V>(
990                context.with_label("validator"),
991                polynomial.clone(),
992                &pks,
993                &validators,
994                &mut registrations,
995                &mut automatons.lock().unwrap(),
996                &mut reporters,
997                &mut oracle,
998                Duration::from_secs(8), // Longer timeout for more complex scenarios
999                advanced_byzantine_fn,
1000            );
1001
1002            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1003        });
1004    }
1005
1006    #[test_traced]
1007    fn test_advanced_byzantine_faults() {
1008        advanced_byzantine_faults::<MinPk>();
1009        advanced_byzantine_faults::<MinSig>();
1010    }
1011
1012    /// Test insufficient validator participation (below quorum).
1013    fn insufficient_validators<V: Variant>() {
1014        let num_validators: u32 = 5;
1015        let quorum: u32 = 3;
1016        let runner = deterministic::Runner::timed(Duration::from_secs(15));
1017
1018        runner.start(|mut context| async move {
1019            let (polynomial, mut shares_vec) =
1020                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1021            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1022
1023            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1024                context.with_label("simulation"),
1025                num_validators,
1026                &mut shares_vec,
1027                RELIABLE_LINK,
1028            )
1029            .await;
1030            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1031            let mut reporters =
1032                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1033
1034            // Start only 2 out of 5 validators (below quorum of 3)
1035            let insufficient_validators: Vec<_> = validators.iter().take(2).cloned().collect();
1036            let insufficient_pks: Vec<_> = pks.iter().take(2).cloned().collect();
1037
1038            spawn_validator_engines::<V>(
1039                context.with_label("validator"),
1040                polynomial.clone(),
1041                &insufficient_pks,
1042                &insufficient_validators,
1043                &mut registrations,
1044                &mut automatons.lock().unwrap(),
1045                &mut reporters,
1046                &mut oracle,
1047                Duration::from_secs(3),
1048                |_| false,
1049            );
1050
1051            // With insufficient validators, consensus should not be achievable
1052            // Wait long enough for any potential consensus attempts to complete
1053            context.sleep(Duration::from_secs(12)).await;
1054
1055            // Check that no validator achieved consensus through verified threshold signatures
1056            let mut any_consensus = false;
1057            for (validator_pk, mut reporter_mailbox) in reporters {
1058                // The reporter only advances contiguous_tip when valid threshold signatures are received
1059                // and cryptographically verified. A contiguous_tip > 0 means at least one threshold
1060                // signature was successfully created and validated, which should be impossible
1061                // with insufficient validators (below quorum).
1062                let contiguous_tip = reporter_mailbox.get_contiguous_tip().await.unwrap_or(0);
1063                if contiguous_tip > 0 {
1064                    any_consensus = true;
1065                    tracing::warn!(
1066                        ?validator_pk,
1067                        contiguous_tip,
1068                        "Unexpected threshold signature consensus with insufficient validators"
1069                    );
1070                }
1071            }
1072
1073            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1074            assert!(
1075                !any_consensus,
1076                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1077            );
1078        });
1079    }
1080
1081    #[test_traced]
1082    fn test_insufficient_validators() {
1083        insufficient_validators::<MinPk>();
1084        insufficient_validators::<MinSig>();
1085    }
1086
1087    /// Test that verifies threshold signatures are mathematically correct and properly formed.
1088    fn threshold_signature_correctness<V: Variant>() {
1089        let num_validators: u32 = 4;
1090        let quorum: u32 = 3;
1091        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1092
1093        runner.start(|mut context| async move {
1094            let (polynomial, mut shares_vec) =
1095                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1096            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1097
1098            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1099                context.with_label("simulation"),
1100                num_validators,
1101                &mut shares_vec,
1102                RELIABLE_LINK,
1103            )
1104            .await;
1105            let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1106            let mut reporters =
1107                BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1108
1109            spawn_validator_engines::<V>(
1110                context.with_label("validator"),
1111                polynomial.clone(),
1112                &pks,
1113                &validators,
1114                &mut registrations,
1115                &mut automatons.lock().unwrap(),
1116                &mut reporters,
1117                &mut oracle,
1118                Duration::from_secs(5),
1119                |_| false, // No Byzantine faults for this test
1120            );
1121
1122            await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1123
1124            // Now verify that all consensus items have mathematically valid threshold signatures
1125            for (validator_pk, mut reporter_mailbox) in reporters {
1126                let tip_result = reporter_mailbox.get_tip().await;
1127                assert!(
1128                    tip_result.is_some(),
1129                    "Reporter should have achieved consensus"
1130                );
1131
1132                let (tip_index, _) = tip_result.unwrap();
1133
1134                // Check each consensus item up to the tip (starting from 1, as 0 might be genesis)
1135                for index in 1..=tip_index {
1136                    let item_result = reporter_mailbox.get(index).await;
1137                    assert!(
1138                        item_result.is_some(),
1139                        "Should have consensus item at index {index}"
1140                    );
1141
1142                    let (digest, epoch) = item_result.unwrap();
1143
1144                    // Manually verify that this item would have a valid threshold signature
1145                    // by constructing the expected item and checking signature validity
1146                    let _item = super::types::Item { index, digest };
1147                    let mut ack_namespace = b"my testing namespace".to_vec();
1148                    ack_namespace.extend_from_slice(b"_AGG_ACK");
1149
1150                    // We can't directly access the threshold signature from the reporter,
1151                    // but we can verify that the reporter's acceptance implies valid crypto.
1152                    // The reporter mock now validates threshold signatures and would panic
1153                    // if they were invalid, so reaching this point means they were valid.
1154
1155                    tracing::debug!(
1156                        ?validator_pk,
1157                        index,
1158                        epoch,
1159                        "Verified valid threshold signature for consensus item"
1160                    );
1161                }
1162            }
1163        });
1164    }
1165
1166    #[test_traced]
1167    fn test_threshold_signature_correctness() {
1168        threshold_signature_correctness::<MinPk>();
1169        threshold_signature_correctness::<MinSig>();
1170    }
1171
1172    /// Test that manually constructs and verifies threshold signatures.
1173    fn manual_threshold_verification<V: Variant>() {
1174        let runner = deterministic::Runner::timed(Duration::from_secs(10));
1175
1176        runner.start(|mut context| async move {
1177            let num_validators = 4u32;
1178            let quorum = 3u32;
1179
1180            // Generate threshold cryptography setup
1181            let (polynomial, shares_vec) =
1182                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1183
1184            // Create a test item to sign
1185            let test_item = super::types::Item {
1186                index: 42,
1187                digest: Sha256Digest::from([1u8; 32]),
1188            };
1189
1190            let namespace = b"test_namespace";
1191            let ack_namespace = [namespace.as_slice(), b"_AGG_ACK"].concat();
1192
1193            // Generate partial signatures from sufficient validators (quorum=3)
1194            let mut partial_sigs = Vec::new();
1195            for share in shares_vec.iter().take(quorum as usize) {
1196                let partial_sig = bls_ops::partial_sign_message::<V>(
1197                    share,
1198                    Some(&ack_namespace),
1199                    &test_item.encode(),
1200                );
1201                partial_sigs.push(partial_sig);
1202            }
1203
1204            // Recover partial signatures into threshold signature
1205            let threshold_sig = poly::Signature::<V>::recover(quorum, &partial_sigs).expect(
1206                "Should be able to recover threshold signature from sufficient partial signatures",
1207            );
1208
1209            // Verify the threshold signature
1210            let threshold_public = poly::public::<V>(&polynomial);
1211            let verification_result = bls_ops::verify_message::<V>(
1212                threshold_public,
1213                Some(&ack_namespace),
1214                &test_item.encode(),
1215                &threshold_sig,
1216            );
1217
1218            assert!(
1219                verification_result.is_ok(),
1220                "Manually constructed threshold signature should be valid: {:?}",
1221                verification_result.err()
1222            );
1223
1224            // Test with insufficient signatures (should fail)
1225            let insufficient_partial_sigs: Vec<_> = partial_sigs
1226                .iter()
1227                .take(quorum as usize - 1)
1228                .cloned()
1229                .collect();
1230            let insufficient_result =
1231                poly::Signature::<V>::recover(quorum, &insufficient_partial_sigs);
1232
1233            assert!(
1234                insufficient_result.is_err(),
1235                "Should not be able to recover threshold signature with insufficient partial signatures"
1236            );
1237
1238            tracing::debug!("Manual threshold signature verification completed successfully");
1239        });
1240    }
1241
1242    #[test_traced]
1243    fn test_manual_threshold_verification() {
1244        manual_threshold_verification::<MinPk>();
1245        manual_threshold_verification::<MinSig>();
1246    }
1247}