Skip to main content

commonware_consensus/aggregation/
mod.rs

1//! Recover quorum certificates over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce quorum certificates
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect quorum certificates over concurrent, sequencer-driven broadcast,
12//! check out [crate::ordered_broadcast]._
13//!
14//! # Pluggable Cryptography
15//!
16//! The aggregation module is generic over the signing scheme, allowing users to choose the
17//! cryptographic scheme best suited for their requirements:
18//!
19//! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification.
20//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
21//!
22//! - [`secp256r1`][scheme::secp256r1]: Attributable signatures with individual verification.
23//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
24//!
25//! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated
26//!   verification. Produces compact certificates while preserving signer attribution.
27//!
28//! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures.
29//!   Produces succinct constant-size certificates. Requires trusted setup (DKG).
30//!
31//! # Architecture
32//!
33//! The core of the module is the [Engine]. It manages the agreement process by:
34//! - Requesting externally synchronized [commonware_cryptography::Digest]s
35//! - Signing said digests with the configured scheme's signature type
36//! - Multicasting signatures/shares to other validators
37//! - Assembling certificates from a quorum of signatures
38//! - Monitoring recovery progress and notifying the application layer of recoveries
39//!
40//! The engine interacts with four main components:
41//! - [crate::Automaton]: Provides external digests
42//! - [crate::Reporter]: Receives agreement confirmations
43//! - [crate::Monitor]: Tracks epoch transitions
44//! - [commonware_cryptography::certificate::Provider]: Manages validator sets and network identities
45//!
46//! # Design Decisions
47//!
48//! ## Missing Certificate Resolution
49//!
50//! The engine does not try to "fill gaps" when certificates are missing. When validators
51//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
52//! certificates may never be emitted by the local engine. Before skipping ahead, we ensure that
53//! at-least-one honest validator has the certificate for any skipped height.
54//!
55//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
56//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
57//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
58//! historical results.
59//!
60//! ## Recovering Certificates
61//!
62//! In aggregation, participants never gossip recovered certificates. Rather, they gossip [types::TipAck]s
63//! with signatures over some height and their latest tip. This approach reduces the overhead of running aggregation
64//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
65//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
66//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
67//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
68
69pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73    if #[cfg(not(target_arch = "wasm32"))] {
74        mod config;
75        pub use config::Config;
76        mod engine;
77        pub use engine::Engine;
78        mod metrics;
79        mod safe_tip;
80
81        #[cfg(test)]
82        pub mod mocks;
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::{mocks, Config, Engine};
89    use crate::{
90        aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91        types::{Epoch, EpochDelta, Height, HeightDelta},
92    };
93    use commonware_cryptography::{
94        bls12381::primitives::variant::{MinPk, MinSig},
95        certificate::mocks::Fixture,
96        ed25519::PublicKey,
97        sha256::Digest as Sha256Digest,
98    };
99    use commonware_macros::{select, test_group, test_traced};
100    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101    use commonware_parallel::Sequential;
102    use commonware_runtime::{
103        buffer::paged::CacheRef,
104        deterministic::{self, Context},
105        Clock, Metrics, Quota, Runner, Spawner,
106    };
107    use commonware_utils::{
108        channel::{fallible::OneshotExt, oneshot},
109        test_rng, NZUsize, NonZeroDuration, NZU16,
110    };
111    use futures::future::join_all;
112    use rand::{rngs::StdRng, Rng};
113    use std::{
114        collections::BTreeMap,
115        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116        time::Duration,
117    };
118    use tracing::debug;
119
120    type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125    const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127    /// Reliable network link configuration for testing.
128    const RELIABLE_LINK: Link = Link {
129        latency: Duration::from_millis(10),
130        jitter: Duration::from_millis(1),
131        success_rate: 1.0,
132    };
133
134    /// Register all participants with the network oracle.
135    async fn register_participants(
136        oracle: &mut Oracle<PublicKey, deterministic::Context>,
137        participants: &[PublicKey],
138    ) -> Registrations<PublicKey> {
139        let mut registrations = BTreeMap::new();
140        for participant in participants.iter() {
141            let (sender, receiver) = oracle
142                .control(participant.clone())
143                .register(0, TEST_QUOTA)
144                .await
145                .unwrap();
146            registrations.insert(participant.clone(), (sender, receiver));
147        }
148        registrations
149    }
150
151    /// Establish network links between all participants.
152    async fn link_participants(
153        oracle: &mut Oracle<PublicKey, deterministic::Context>,
154        participants: &[PublicKey],
155        link: Link,
156    ) {
157        for v1 in participants.iter() {
158            for v2 in participants.iter() {
159                if v2 == v1 {
160                    continue;
161                }
162                oracle
163                    .add_link(v1.clone(), v2.clone(), link.clone())
164                    .await
165                    .unwrap();
166            }
167        }
168    }
169
170    /// Initialize a simulated network environment.
171    async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172        context: Context,
173        fixture: &Fixture<S>,
174        link: Link,
175    ) -> (
176        Oracle<PublicKey, deterministic::Context>,
177        Registrations<PublicKey>,
178    ) {
179        let (network, mut oracle) = Network::new_with_peers(
180            context.with_label("network"),
181            commonware_p2p::simulated::Config {
182                max_size: 1024 * 1024,
183                disconnect_on_block: true,
184                tracked_peer_sets: NZUsize!(1),
185            },
186            fixture.participants.clone(),
187        )
188        .await;
189        network.start();
190
191        let registrations = register_participants(&mut oracle, &fixture.participants).await;
192        link_participants(&mut oracle, &fixture.participants, link).await;
193
194        (oracle, registrations)
195    }
196
197    /// Spawn aggregation engines for all validators.
198    fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
199        context: Context,
200        fixture: &Fixture<S>,
201        registrations: &mut Registrations<PublicKey>,
202        oracle: &mut Oracle<PublicKey, deterministic::Context>,
203        epoch: Epoch,
204        rebroadcast_timeout: Duration,
205        incorrect: Vec<usize>,
206    ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
207        let mut reporters = BTreeMap::new();
208
209        for (idx, participant) in fixture.participants.iter().enumerate() {
210            let context = context.with_label(&format!("participant_{participant}"));
211
212            // Create Provider and register scheme for epoch
213            let provider = mocks::Provider::new();
214            assert!(provider.register(epoch, fixture.schemes[idx].clone()));
215
216            // Create monitor
217            let monitor = mocks::Monitor::new(epoch);
218
219            // Create automaton with Incorrect strategy for byzantine validators
220            let strategy = if incorrect.contains(&idx) {
221                mocks::Strategy::Incorrect
222            } else {
223                mocks::Strategy::Correct
224            };
225            let automaton = mocks::Application::new(strategy);
226
227            // Create reporter with verifier scheme
228            let (reporter, reporter_mailbox) =
229                mocks::Reporter::new(context.clone(), fixture.verifier.clone());
230            context.with_label("reporter").spawn(|_| reporter.run());
231            reporters.insert(participant.clone(), reporter_mailbox.clone());
232
233            // Create blocker
234            let blocker = oracle.control(participant.clone());
235
236            // Create and start engine
237            let engine = Engine::new(
238                context.with_label("engine"),
239                Config {
240                    monitor,
241                    provider,
242                    automaton,
243                    reporter: reporter_mailbox,
244                    blocker,
245                    priority_acks: false,
246                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247                    epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
248                    window: std::num::NonZeroU64::new(10).unwrap(),
249                    activity_timeout: HeightDelta::new(100),
250                    journal_partition: format!("aggregation-{participant}"),
251                    journal_write_buffer: NZUsize!(4096),
252                    journal_replay_buffer: NZUsize!(4096),
253                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254                    journal_compression: Some(3),
255                    journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
256                    strategy: Sequential,
257                },
258            );
259
260            let (sender, receiver) = registrations.remove(participant).unwrap();
261            engine.start((sender, receiver));
262        }
263
264        reporters
265    }
266
267    /// Wait for all reporters to reach the specified consensus threshold.
268    async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
269        context: Context,
270        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
271        threshold_height: Height,
272        threshold_epoch: Epoch,
273    ) {
274        let mut receivers = Vec::new();
275        for (reporter, mailbox) in reporters.iter() {
276            // Create a oneshot channel to signal when the reporter has reached the threshold.
277            let (tx, rx) = oneshot::channel();
278            receivers.push(rx);
279
280            context.with_label("reporter_watcher").spawn({
281                let reporter = reporter.clone();
282                let mut mailbox = mailbox.clone();
283                move |context| async move {
284                    loop {
285                        let (height, epoch) = mailbox
286                            .get_tip()
287                            .await
288                            .unwrap_or((Height::zero(), Epoch::zero()));
289                        debug!(
290                            %height,
291                            epoch = %epoch,
292                            %threshold_height,
293                            threshold_epoch = %threshold_epoch,
294                            ?reporter,
295                            "reporter status"
296                        );
297                        if height >= threshold_height && epoch >= threshold_epoch {
298                            debug!(
299                                ?reporter,
300                                "reporter reached threshold, signaling completion"
301                            );
302                            tx.send_lossy(reporter.clone());
303                            break;
304                        }
305                        context.sleep(Duration::from_millis(100)).await;
306                    }
307                }
308            });
309        }
310
311        // Wait for all oneshot receivers to complete.
312        let results = join_all(receivers).await;
313        assert_eq!(results.len(), reporters.len());
314
315        // Check that none were cancelled.
316        for result in results {
317            assert!(result.is_ok(), "reporter was cancelled");
318        }
319    }
320
321    /// Test aggregation consensus with all validators online.
322    fn all_online<S, F>(fixture: F)
323    where
324        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
325        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
326    {
327        let runner = deterministic::Runner::timed(Duration::from_secs(30));
328
329        runner.start(|mut context| async move {
330            let num_validators = 4;
331            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
332            let epoch = Epoch::new(111);
333
334            let (mut oracle, mut registrations) =
335                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
336                    .await;
337
338            let reporters = spawn_validator_engines(
339                context.with_label("validator"),
340                &fixture,
341                &mut registrations,
342                &mut oracle,
343                epoch,
344                Duration::from_secs(5),
345                vec![],
346            );
347
348            await_reporters(
349                context.with_label("reporter"),
350                &reporters,
351                Height::new(100),
352                epoch,
353            )
354            .await;
355        });
356    }
357
358    #[test_group("slow")]
359    #[test_traced("INFO")]
360    fn test_all_online() {
361        all_online(bls12381_threshold::fixture::<MinPk, _>);
362        all_online(bls12381_threshold::fixture::<MinSig, _>);
363        all_online(bls12381_multisig::fixture::<MinPk, _>);
364        all_online(bls12381_multisig::fixture::<MinSig, _>);
365        all_online(ed25519::fixture);
366        all_online(secp256r1::fixture);
367    }
368
369    /// Test consensus resilience to Byzantine behavior.
370    fn byzantine_proposer<S, F>(fixture: F)
371    where
372        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
373        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
374    {
375        let runner = deterministic::Runner::timed(Duration::from_secs(30));
376
377        runner.start(|mut context| async move {
378            let num_validators = 4;
379            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
380            let epoch = Epoch::new(111);
381
382            let (mut oracle, mut registrations) =
383                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
384                    .await;
385
386            let reporters = spawn_validator_engines(
387                context.with_label("validator"),
388                &fixture,
389                &mut registrations,
390                &mut oracle,
391                epoch,
392                Duration::from_secs(5),
393                vec![0],
394            );
395
396            await_reporters(
397                context.with_label("reporter"),
398                &reporters,
399                Height::new(100),
400                epoch,
401            )
402            .await;
403        });
404    }
405
406    #[test_traced("INFO")]
407    fn test_byzantine_proposer() {
408        byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
409        byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
410        byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
411        byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
412        byzantine_proposer(ed25519::fixture);
413        byzantine_proposer(secp256r1::fixture);
414    }
415
416    fn unclean_byzantine_shutdown<S, F>(fixture: F)
417    where
418        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
419        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
420    {
421        // Test parameters
422        let num_validators = 4;
423        let target_height = Height::new(200); // Target multiple rounds of signing
424        let min_shutdowns = 4; // Minimum number of shutdowns per validator
425        let max_shutdowns = 10; // Maximum number of shutdowns per validator
426        let shutdown_range_min = Duration::from_millis(100);
427        let shutdown_range_max = Duration::from_millis(1_000);
428        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
429
430        let mut prev_checkpoint = None;
431
432        // Generate fixture once (persists across restarts)
433        let mut rng = test_rng();
434        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
435
436        // Continue until shared reporter reaches target or max shutdowns exceeded
437        let mut shutdown_count = 0;
438        while shutdown_count < max_shutdowns {
439            let fixture = fixture.clone();
440            let f = move |mut context: Context| {
441                async move {
442                    let epoch = Epoch::new(111);
443
444                    let (oracle, mut registrations) = initialize_simulation(
445                        context.with_label("simulation"),
446                        &fixture,
447                        RELIABLE_LINK,
448                    )
449                    .await;
450
451                    // Create a shared reporter
452                    //
453                    // We rely on replay to populate this reporter with a contiguous history of certificates.
454                    let (reporter, mut reporter_mailbox) =
455                        mocks::Reporter::new(context.clone(), fixture.verifier.clone());
456                    context.with_label("reporter").spawn(|_| reporter.run());
457
458                    // Spawn validator engines
459                    for (idx, participant) in fixture.participants.iter().enumerate() {
460                        let validator_context =
461                            context.with_label(&format!("participant_{participant}"));
462
463                        // Create Provider and register scheme for epoch
464                        let provider = mocks::Provider::new();
465                        assert!(provider.register(epoch, fixture.schemes[idx].clone()));
466
467                        // Create monitor
468                        let monitor = mocks::Monitor::new(epoch);
469
470                        // Create automaton (validator 0 is Byzantine)
471                        let strategy = if idx == 0 {
472                            mocks::Strategy::Incorrect
473                        } else {
474                            mocks::Strategy::Correct
475                        };
476                        let automaton = mocks::Application::new(strategy);
477
478                        // Create blocker
479                        let blocker = oracle.control(participant.clone());
480
481                        // Create and start engine
482                        let engine = Engine::new(
483                            validator_context.with_label("engine"),
484                            Config {
485                                monitor,
486                                provider,
487                                automaton,
488                                reporter: reporter_mailbox.clone(),
489                                blocker,
490                                priority_acks: false,
491                                rebroadcast_timeout,
492                                epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
493                                window: std::num::NonZeroU64::new(10).unwrap(),
494                                activity_timeout: HeightDelta::new(1_024), // ensure we don't drop any certificates
495                                journal_partition: format!("unclean_shutdown_test_{participant}"),
496                                journal_write_buffer: NZUsize!(4096),
497                                journal_replay_buffer: NZUsize!(4096),
498                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
499                                journal_compression: Some(3),
500                                journal_page_cache: CacheRef::from_pooler(
501                                    &context,
502                                    PAGE_SIZE,
503                                    PAGE_CACHE_SIZE,
504                                ),
505                                strategy: Sequential,
506                            },
507                        );
508
509                        let (sender, receiver) = registrations.remove(participant).unwrap();
510                        engine.start((sender, receiver));
511                    }
512
513                    // Create a single completion watcher for the shared reporter
514                    let completion =
515                        context
516                            .with_label("completion_watcher")
517                            .spawn(move |context| async move {
518                                loop {
519                                    if let Some(tip_height) =
520                                        reporter_mailbox.get_contiguous_tip().await
521                                    {
522                                        if tip_height >= target_height {
523                                            break;
524                                        }
525                                    }
526                                    context.sleep(Duration::from_millis(50)).await;
527                                }
528                            });
529
530                    // Random shutdown timing to simulate unclean shutdown
531                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532                    select! {
533                        _ = context.sleep(shutdown_wait) => {
534                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535                            false // Unclean shutdown
536                        },
537                        _ = completion => {
538                            debug!("Shared reporter completed normally");
539                            true // Clean completion
540                        },
541                    }
542                }
543            };
544
545            let (complete, checkpoint) = prev_checkpoint
546                .map_or_else(
547                    || {
548                        debug!("Starting initial run");
549                        deterministic::Runner::timed(Duration::from_secs(45))
550                    },
551                    |prev_checkpoint| {
552                        debug!(shutdown_count, "Restarting from previous context");
553                        deterministic::Runner::from(prev_checkpoint)
554                    },
555                )
556                .start_and_recover(f);
557
558            if complete && shutdown_count >= min_shutdowns {
559                debug!("Test completed successfully");
560                break;
561            }
562
563            prev_checkpoint = Some(checkpoint);
564            shutdown_count += 1;
565        }
566    }
567
568    #[test_group("slow")]
569    #[test_traced("INFO")]
570    fn test_unclean_byzantine_shutdown() {
571        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
572        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
573        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
574        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
575        unclean_byzantine_shutdown(ed25519::fixture);
576        unclean_byzantine_shutdown(secp256r1::fixture);
577    }
578
579    fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
580    where
581        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
582        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
583    {
584        // Test parameters
585        let num_validators = 4;
586        let skip_height = Height::new(50); // Height where no one will sign
587        let window = HeightDelta::new(10);
588        let target_height = Height::new(100);
589
590        // Generate fixture once (persists across restarts)
591        let mut rng = test_rng();
592        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
593
594        // First run: let validators skip signing at skip_height and reach beyond it
595        let f = |context: Context| {
596            let fixture = fixture.clone();
597            async move {
598                let epoch = Epoch::new(111);
599
600                // Set up simulated network
601                let (oracle, mut registrations) = initialize_simulation(
602                    context.with_label("simulation"),
603                    &fixture,
604                    RELIABLE_LINK,
605                )
606                .await;
607
608                // Create a shared reporter
609                let (reporter, mut reporter_mailbox) =
610                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
611                context.with_label("reporter").spawn(|_| reporter.run());
612
613                // Start validator engines with Skip strategy for skip_height
614                for (idx, participant) in fixture.participants.iter().enumerate() {
615                    let validator_context =
616                        context.with_label(&format!("participant_{participant}"));
617
618                    // Create Provider and register scheme for epoch
619                    let provider = mocks::Provider::new();
620                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
621
622                    // Create monitor
623                    let monitor = mocks::Monitor::new(epoch);
624
625                    // All validators use Skip strategy for skip_height
626                    let automaton = mocks::Application::new(mocks::Strategy::Skip {
627                        height: skip_height,
628                    });
629
630                    // Create blocker
631                    let blocker = oracle.control(participant.clone());
632
633                    // Create and start engine
634                    let engine = Engine::new(
635                        validator_context.with_label("engine"),
636                        Config {
637                            monitor,
638                            provider,
639                            automaton,
640                            reporter: reporter_mailbox.clone(),
641                            blocker,
642                            priority_acks: false,
643                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
644                                100,
645                            )),
646                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
647                            window: std::num::NonZeroU64::new(window.get()).unwrap(),
648                            activity_timeout: HeightDelta::new(100),
649                            journal_partition: format!("unsigned_height_test_{participant}"),
650                            journal_write_buffer: NZUsize!(4096),
651                            journal_replay_buffer: NZUsize!(4096),
652                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
653                            journal_compression: Some(3),
654                            journal_page_cache: CacheRef::from_pooler(
655                                &context,
656                                PAGE_SIZE,
657                                PAGE_CACHE_SIZE,
658                            ),
659                            strategy: Sequential,
660                        },
661                    );
662
663                    let (sender, receiver) = registrations.remove(participant).unwrap();
664                    engine.start((sender, receiver));
665                }
666
667                // Wait for validators to reach target_height (past skip_height)
668                loop {
669                    if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
670                        debug!(%tip_height, %skip_height, %target_height, "reporter status");
671                        if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
672                            // max we can proceed before item confirmed
673                            return;
674                        }
675                    }
676                    context.sleep(Duration::from_millis(50)).await;
677                }
678            }
679        };
680
681        let (_, checkpoint) =
682            deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
683
684        // Second run: restart and verify the skip_height gets confirmed
685        let f2 = |context: Context| {
686            async move {
687                let epoch = Epoch::new(111);
688
689                // Set up simulated network
690                let (oracle, mut registrations) = initialize_simulation(
691                    context.with_label("simulation"),
692                    &fixture,
693                    RELIABLE_LINK,
694                )
695                .await;
696
697                // Create a shared reporter
698                let (reporter, mut reporter_mailbox) =
699                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
700                context.with_label("reporter").spawn(|_| reporter.run());
701
702                // Start validator engines with Correct strategy (will sign everything now)
703                for (idx, participant) in fixture.participants.iter().enumerate() {
704                    let validator_context =
705                        context.with_label(&format!("participant_{participant}"));
706
707                    // Create Provider and register scheme for epoch
708                    let provider = mocks::Provider::new();
709                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
710
711                    // Create monitor
712                    let monitor = mocks::Monitor::new(epoch);
713
714                    // Now all validators use Correct strategy
715                    let automaton = mocks::Application::new(mocks::Strategy::Correct);
716
717                    // Create blocker
718                    let blocker = oracle.control(participant.clone());
719
720                    // Create and start engine
721                    let engine = Engine::new(
722                        validator_context.with_label("engine"),
723                        Config {
724                            monitor,
725                            provider,
726                            automaton,
727                            reporter: reporter_mailbox.clone(),
728                            blocker,
729                            priority_acks: false,
730                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
731                                100,
732                            )),
733                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
734                            window: std::num::NonZeroU64::new(10).unwrap(),
735                            activity_timeout: HeightDelta::new(100),
736                            journal_partition: format!("unsigned_height_test_{participant}"),
737                            journal_write_buffer: NZUsize!(4096),
738                            journal_replay_buffer: NZUsize!(4096),
739                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
740                            journal_compression: Some(3),
741                            journal_page_cache: CacheRef::from_pooler(
742                                &context,
743                                PAGE_SIZE,
744                                PAGE_CACHE_SIZE,
745                            ),
746                            strategy: Sequential,
747                        },
748                    );
749
750                    let (sender, receiver) = registrations.remove(participant).unwrap();
751                    engine.start((sender, receiver));
752                }
753
754                // Wait for skip_height to be confirmed (should happen on replay)
755                loop {
756                    if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
757                        debug!(
758                            %tip_height,
759                            %skip_height, %target_height, "reporter status on restart"
760                        );
761                        if tip_height >= target_height {
762                            break;
763                        }
764                    }
765                    context.sleep(Duration::from_millis(50)).await;
766                }
767            }
768        };
769
770        deterministic::Runner::from(checkpoint).start(f2);
771    }
772
773    #[test_group("slow")]
774    #[test_traced("INFO")]
775    fn test_unclean_shutdown_with_unsigned_height() {
776        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
777        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
778        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
779        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
780        unclean_shutdown_with_unsigned_height(ed25519::fixture);
781        unclean_shutdown_with_unsigned_height(secp256r1::fixture);
782    }
783
784    fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
785    where
786        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
787        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
788    {
789        let cfg = deterministic::Config::new()
790            .with_seed(seed)
791            .with_timeout(Some(Duration::from_secs(120)));
792        let runner = deterministic::Runner::new(cfg);
793
794        runner.start(|mut context| async move {
795            let num_validators = 4;
796            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
797            let epoch = Epoch::new(111);
798
799            // Use degraded network links with realistic conditions
800            let degraded_link = Link {
801                latency: Duration::from_millis(200),
802                jitter: Duration::from_millis(150),
803                success_rate: 0.5,
804            };
805
806            let (mut oracle, mut registrations) =
807                initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
808                    .await;
809
810            let reporters = spawn_validator_engines(
811                context.with_label("validator"),
812                &fixture,
813                &mut registrations,
814                &mut oracle,
815                epoch,
816                Duration::from_secs(2),
817                vec![],
818            );
819
820            await_reporters(
821                context.with_label("reporter"),
822                &reporters,
823                Height::new(100),
824                epoch,
825            )
826            .await;
827
828            context.auditor().state()
829        })
830    }
831
832    #[test_group("slow")]
833    #[test_traced("INFO")]
834    fn test_slow_and_lossy_links() {
835        slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
836        slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
837        slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
838        slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
839        slow_and_lossy_links(ed25519::fixture, 0);
840        slow_and_lossy_links(secp256r1::fixture, 0);
841    }
842
843    #[test_group("slow")]
844    #[test_traced("INFO")]
845    fn test_determinism() {
846        // We use slow and lossy links as the deterministic test
847        // because it is the most complex test.
848        for seed in 1..6 {
849            // Test BLS threshold MinPk
850            let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
851            let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
852            assert_eq!(ts_pk_state_1, ts_pk_state_2);
853
854            // Test BLS threshold MinSig
855            let ts_sig_state_1 =
856                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
857            let ts_sig_state_2 =
858                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
859            assert_eq!(ts_sig_state_1, ts_sig_state_2);
860
861            // Test BLS multisig MinPk
862            let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
863            let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
864            assert_eq!(ms_pk_state_1, ms_pk_state_2);
865
866            // Test BLS multisig MinSig
867            let ms_sig_state_1 =
868                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
869            let ms_sig_state_2 =
870                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
871            assert_eq!(ms_sig_state_1, ms_sig_state_2);
872
873            // Test ed25519
874            let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
875            let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
876            assert_eq!(ed_state_1, ed_state_2);
877
878            // Test secp256r1
879            let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
880            let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
881            assert_eq!(secp_state_1, secp_state_2);
882
883            let states = [
884                ("threshold-minpk", ts_pk_state_1),
885                ("threshold-minsig", ts_sig_state_1),
886                ("multisig-minpk", ms_pk_state_1),
887                ("multisig-minsig", ms_sig_state_1),
888                ("ed25519", ed_state_1),
889                ("secp256r1", secp_state_1),
890            ];
891
892            // Sanity check that different types can't be identical
893            for pair in states.windows(2) {
894                assert_ne!(
895                    pair[0].1, pair[1].1,
896                    "state {} equals state {}",
897                    pair[0].0, pair[1].0
898                );
899            }
900        }
901    }
902
903    fn one_offline<S, F>(fixture: F)
904    where
905        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
906        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
907    {
908        let runner = deterministic::Runner::timed(Duration::from_secs(30));
909
910        runner.start(|mut context| async move {
911            let num_validators = 5;
912            let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
913            let epoch = Epoch::new(111);
914
915            // Truncate to only 4 validators (one offline)
916            fixture.participants.truncate(4);
917            fixture.schemes.truncate(4);
918
919            let (mut oracle, mut registrations) =
920                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
921                    .await;
922
923            let reporters = spawn_validator_engines(
924                context.with_label("validator"),
925                &fixture,
926                &mut registrations,
927                &mut oracle,
928                epoch,
929                Duration::from_secs(5),
930                vec![],
931            );
932
933            await_reporters(
934                context.with_label("reporter"),
935                &reporters,
936                Height::new(100),
937                epoch,
938            )
939            .await;
940        });
941    }
942
943    #[test_group("slow")]
944    #[test_traced("INFO")]
945    fn test_one_offline() {
946        one_offline(bls12381_threshold::fixture::<MinPk, _>);
947        one_offline(bls12381_threshold::fixture::<MinSig, _>);
948        one_offline(bls12381_multisig::fixture::<MinPk, _>);
949        one_offline(bls12381_multisig::fixture::<MinSig, _>);
950        one_offline(ed25519::fixture);
951        one_offline(secp256r1::fixture);
952    }
953
954    /// Test consensus recovery after a network partition.
955    fn network_partition<S, F>(fixture: F)
956    where
957        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
958        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
959    {
960        let runner = deterministic::Runner::timed(Duration::from_secs(60));
961
962        runner.start(|mut context| async move {
963            let num_validators = 4;
964            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
965            let epoch = Epoch::new(111);
966
967            let (mut oracle, mut registrations) =
968                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
969                    .await;
970
971            let reporters = spawn_validator_engines(
972                context.with_label("validator"),
973                &fixture,
974                &mut registrations,
975                &mut oracle,
976                epoch,
977                Duration::from_secs(5),
978                vec![],
979            );
980
981            // Partition network (remove all links)
982            for v1 in fixture.participants.iter() {
983                for v2 in fixture.participants.iter() {
984                    if v2 == v1 {
985                        continue;
986                    }
987                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
988                }
989            }
990            context.sleep(Duration::from_secs(20)).await;
991
992            // Restore network links
993            for v1 in fixture.participants.iter() {
994                for v2 in fixture.participants.iter() {
995                    if v2 == v1 {
996                        continue;
997                    }
998                    oracle
999                        .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
1000                        .await
1001                        .unwrap();
1002                }
1003            }
1004
1005            await_reporters(
1006                context.with_label("reporter"),
1007                &reporters,
1008                Height::new(100),
1009                epoch,
1010            )
1011            .await;
1012        });
1013    }
1014
1015    #[test_traced("INFO")]
1016    fn test_network_partition() {
1017        network_partition(bls12381_threshold::fixture::<MinPk, _>);
1018        network_partition(bls12381_threshold::fixture::<MinSig, _>);
1019        network_partition(bls12381_multisig::fixture::<MinPk, _>);
1020        network_partition(bls12381_multisig::fixture::<MinSig, _>);
1021        network_partition(ed25519::fixture);
1022        network_partition(secp256r1::fixture);
1023    }
1024
1025    /// Test insufficient validator participation (below quorum).
1026    fn insufficient_validators<S, F>(fixture: F)
1027    where
1028        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1029        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1030    {
1031        let runner = deterministic::Runner::timed(Duration::from_secs(15));
1032
1033        runner.start(|mut context| async move {
1034            let num_validators = 5;
1035            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1036            let epoch = Epoch::new(111);
1037
1038            // Set up simulated network
1039            let (oracle, mut registrations) =
1040                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1041                    .await;
1042
1043            // Create reporters (one per online validator)
1044            let mut reporters =
1045                BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1046
1047            // Start only 2 out of 5 validators (below quorum of 3)
1048            for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1049                let context = context.with_label(&format!("participant_{participant}"));
1050
1051                // Create Provider and register scheme for epoch
1052                let provider = mocks::Provider::new();
1053                assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1054
1055                // Create monitor
1056                let monitor = mocks::Monitor::new(epoch);
1057
1058                // Create automaton with Correct strategy
1059                let automaton = mocks::Application::new(mocks::Strategy::Correct);
1060
1061                // Create reporter with verifier scheme
1062                let (reporter, reporter_mailbox) =
1063                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
1064                context.with_label("reporter").spawn(|_| reporter.run());
1065                reporters.insert(participant.clone(), reporter_mailbox.clone());
1066
1067                // Create blocker
1068                let blocker = oracle.control(participant.clone());
1069
1070                // Create and start engine
1071                let engine = Engine::new(
1072                    context.with_label("engine"),
1073                    Config {
1074                        monitor,
1075                        provider,
1076                        automaton,
1077                        reporter: reporter_mailbox,
1078                        blocker,
1079                        priority_acks: false,
1080                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1081                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1082                        window: std::num::NonZeroU64::new(10).unwrap(),
1083                        activity_timeout: HeightDelta::new(100),
1084                        journal_partition: format!("aggregation-{participant}"),
1085                        journal_write_buffer: NZUsize!(4096),
1086                        journal_replay_buffer: NZUsize!(4096),
1087                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1088                        journal_compression: Some(3),
1089                        journal_page_cache: CacheRef::from_pooler(
1090                            &context,
1091                            PAGE_SIZE,
1092                            PAGE_CACHE_SIZE,
1093                        ),
1094                        strategy: Sequential,
1095                    },
1096                );
1097
1098                let (sender, receiver) = registrations.remove(participant).unwrap();
1099                engine.start((sender, receiver));
1100            }
1101
1102            // With insufficient validators, consensus should not be achievable
1103            // Wait long enough for any potential consensus attempts to complete
1104            context.sleep(Duration::from_secs(12)).await;
1105
1106            // Check that no validator achieved consensus
1107            let mut any_consensus = false;
1108            for (validator_pk, mut reporter_mailbox) in reporters {
1109                let (tip, _) = reporter_mailbox
1110                    .get_tip()
1111                    .await
1112                    .unwrap_or((Height::zero(), Epoch::zero()));
1113                if !tip.is_zero() {
1114                    any_consensus = true;
1115                    tracing::warn!(
1116                        ?validator_pk,
1117                        %tip,
1118                        "Unexpected consensus with insufficient validators"
1119                    );
1120                }
1121            }
1122
1123            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1124            assert!(
1125                !any_consensus,
1126                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1127            );
1128        });
1129    }
1130
1131    #[test_traced("INFO")]
1132    fn test_insufficient_validators() {
1133        insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1134        insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1135        insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1136        insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1137        insufficient_validators(ed25519::fixture);
1138        insufficient_validators(secp256r1::fixture);
1139    }
1140}