Skip to main content

commonware_consensus/aggregation/
mod.rs

1//! Recover quorum certificates over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce quorum certificates
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect quorum certificates over concurrent, sequencer-driven broadcast,
12//! check out [crate::ordered_broadcast]._
13//!
14//! # Pluggable Cryptography
15//!
16//! The aggregation module is generic over the signing scheme, allowing users to choose the
17//! cryptographic scheme best suited for their requirements:
18//!
19//! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification.
20//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
21//!
22//! - [`secp256r1`][scheme::secp256r1]: Attributable signatures with individual verification.
23//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
24//!
25//! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated
26//!   verification. Produces compact certificates while preserving signer attribution.
27//!
28//! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures.
29//!   Produces succinct constant-size certificates. Requires trusted setup (DKG).
30//!
31//! # Architecture
32//!
33//! The core of the module is the [Engine]. It manages the agreement process by:
34//! - Requesting externally synchronized [commonware_cryptography::Digest]s
35//! - Signing said digests with the configured scheme's signature type
36//! - Multicasting signatures/shares to other validators
37//! - Assembling certificates from a quorum of signatures
38//! - Monitoring recovery progress and notifying the application layer of recoveries
39//!
40//! The engine interacts with four main components:
41//! - [crate::Automaton]: Provides external digests
42//! - [crate::Reporter]: Receives agreement confirmations
43//! - [crate::Monitor]: Tracks epoch transitions
44//! - [commonware_cryptography::certificate::Provider]: Manages validator sets and network identities
45//!
46//! # Design Decisions
47//!
48//! ## Missing Certificate Resolution
49//!
50//! The engine does not try to "fill gaps" when certificates are missing. When validators
51//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
52//! certificates may never be emitted by the local engine. Before skipping ahead, we ensure that
53//! at-least-one honest validator has the certificate for any skipped height.
54//!
55//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
56//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
57//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
58//! historical results.
59//!
60//! ## Recovering Certificates
61//!
62//! In aggregation, participants never gossip recovered certificates. Rather, they gossip [types::TipAck]s
63//! with signatures over some height and their latest tip. This approach reduces the overhead of running aggregation
64//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
65//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
66//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
67//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
68
69pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73    if #[cfg(not(target_arch = "wasm32"))] {
74        mod config;
75        pub use config::Config;
76        mod engine;
77        pub use engine::Engine;
78        mod metrics;
79        mod safe_tip;
80
81        #[cfg(test)]
82        pub mod mocks;
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::{mocks, Config, Engine};
89    use crate::{
90        aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91        types::{Epoch, EpochDelta, Height, HeightDelta},
92    };
93    use commonware_cryptography::{
94        bls12381::primitives::variant::{MinPk, MinSig},
95        certificate::mocks::Fixture,
96        ed25519::PublicKey,
97        sha256::Digest as Sha256Digest,
98    };
99    use commonware_macros::{select, test_group, test_traced};
100    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101    use commonware_parallel::Sequential;
102    use commonware_runtime::{
103        buffer::paged::CacheRef,
104        deterministic::{self, Context},
105        Clock, Quota, Runner, Spawner, Supervisor as _,
106    };
107    use commonware_utils::{
108        channel::{fallible::OneshotExt, oneshot},
109        test_rng, NZUsize, NonZeroDuration, NZU16,
110    };
111    use futures::future::join_all;
112    use rand::{rngs::StdRng, Rng};
113    use std::{
114        collections::BTreeMap,
115        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116        time::Duration,
117    };
118    use tracing::debug;
119
120    type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125    const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127    /// Reliable network link configuration for testing.
128    const RELIABLE_LINK: Link = Link {
129        latency: Duration::from_millis(10),
130        jitter: Duration::from_millis(1),
131        success_rate: 1.0,
132    };
133
134    /// Register all participants with the network oracle.
135    async fn register_participants(
136        oracle: &mut Oracle<PublicKey, deterministic::Context>,
137        participants: &[PublicKey],
138    ) -> Registrations<PublicKey> {
139        let mut registrations = BTreeMap::new();
140        for participant in participants.iter() {
141            let (sender, receiver) = oracle
142                .control(participant.clone())
143                .register(0, TEST_QUOTA)
144                .await
145                .unwrap();
146            registrations.insert(participant.clone(), (sender, receiver));
147        }
148        registrations
149    }
150
151    /// Establish network links between all participants.
152    async fn link_participants(
153        oracle: &mut Oracle<PublicKey, deterministic::Context>,
154        participants: &[PublicKey],
155        link: Link,
156    ) {
157        for v1 in participants.iter() {
158            for v2 in participants.iter() {
159                if v2 == v1 {
160                    continue;
161                }
162                oracle
163                    .add_link(v1.clone(), v2.clone(), link.clone())
164                    .await
165                    .unwrap();
166            }
167        }
168    }
169
170    /// Initialize a simulated network environment.
171    async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172        context: Context,
173        fixture: &Fixture<S>,
174        link: Link,
175    ) -> (
176        Oracle<PublicKey, deterministic::Context>,
177        Registrations<PublicKey>,
178    ) {
179        let (network, mut oracle) = Network::new_with_peers(
180            context.child("network"),
181            commonware_p2p::simulated::Config {
182                max_size: 1024 * 1024,
183                disconnect_on_block: true,
184                tracked_peer_sets: NZUsize!(1),
185            },
186            fixture.participants.clone(),
187        )
188        .await;
189        network.start();
190
191        let registrations = register_participants(&mut oracle, &fixture.participants).await;
192        link_participants(&mut oracle, &fixture.participants, link).await;
193
194        (oracle, registrations)
195    }
196
197    /// Spawn aggregation engines for all validators.
198    fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
199        context: Context,
200        fixture: &Fixture<S>,
201        registrations: &mut Registrations<PublicKey>,
202        oracle: &mut Oracle<PublicKey, deterministic::Context>,
203        epoch: Epoch,
204        rebroadcast_timeout: Duration,
205        incorrect: Vec<usize>,
206    ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
207        let mut reporters = BTreeMap::new();
208
209        for (idx, participant) in fixture.participants.iter().enumerate() {
210            let context = context
211                .child("participant")
212                .with_attribute("public_key", participant);
213
214            // Create Provider and register scheme for epoch
215            let provider = mocks::Provider::new();
216            assert!(provider.register(epoch, fixture.schemes[idx].clone()));
217
218            // Create monitor
219            let monitor = mocks::Monitor::new(epoch);
220
221            // Create automaton with Incorrect strategy for byzantine validators
222            let strategy = if incorrect.contains(&idx) {
223                mocks::Strategy::Incorrect
224            } else {
225                mocks::Strategy::Correct
226            };
227            let automaton = mocks::Application::new(strategy);
228
229            // Create reporter with verifier scheme
230            let (reporter, reporter_mailbox) =
231                mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
232            reporter.start();
233            reporters.insert(participant.clone(), reporter_mailbox.clone());
234
235            // Create blocker
236            let blocker = oracle.control(participant.clone());
237
238            // Create and start engine
239            let engine = Engine::new(
240                context.child("engine"),
241                Config {
242                    monitor,
243                    provider,
244                    automaton,
245                    reporter: reporter_mailbox,
246                    blocker,
247                    priority_acks: false,
248                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
249                    epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
250                    window: std::num::NonZeroU64::new(10).unwrap(),
251                    activity_timeout: HeightDelta::new(100),
252                    journal_partition: format!("aggregation-{participant}"),
253                    journal_write_buffer: NZUsize!(4096),
254                    journal_replay_buffer: NZUsize!(4096),
255                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
256                    journal_compression: Some(3),
257                    journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
258                    strategy: Sequential,
259                },
260            );
261
262            let (sender, receiver) = registrations.remove(participant).unwrap();
263            engine.start((sender, receiver));
264        }
265
266        reporters
267    }
268
269    /// Wait for all reporters to reach the specified consensus threshold.
270    async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
271        context: Context,
272        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
273        threshold_height: Height,
274        threshold_epoch: Epoch,
275    ) {
276        let mut receivers = Vec::new();
277        for (reporter, mailbox) in reporters.iter() {
278            // Create a oneshot channel to signal when the reporter has reached the threshold.
279            let (tx, rx) = oneshot::channel();
280            receivers.push(rx);
281
282            context
283                .child("reporter_watcher")
284                .with_attribute("reporter", reporter)
285                .spawn({
286                    let reporter = reporter.clone();
287                    let mut mailbox = mailbox.clone();
288                    move |context| async move {
289                        loop {
290                            let (height, epoch) = mailbox
291                                .get_tip()
292                                .await
293                                .unwrap_or((Height::zero(), Epoch::zero()));
294                            debug!(
295                                %height,
296                                epoch = %epoch,
297                                %threshold_height,
298                                threshold_epoch = %threshold_epoch,
299                                ?reporter,
300                                "reporter status"
301                            );
302                            if height >= threshold_height && epoch >= threshold_epoch {
303                                debug!(
304                                    ?reporter,
305                                    "reporter reached threshold, signaling completion"
306                                );
307                                tx.send_lossy(reporter.clone());
308                                break;
309                            }
310                            context.sleep(Duration::from_millis(100)).await;
311                        }
312                    }
313                });
314        }
315
316        // Wait for all oneshot receivers to complete.
317        let results = join_all(receivers).await;
318        assert_eq!(results.len(), reporters.len());
319
320        // Check that none were cancelled.
321        for result in results {
322            assert!(result.is_ok(), "reporter was cancelled");
323        }
324    }
325
326    /// Test aggregation consensus with all validators online.
327    fn all_online<S, F>(fixture: F)
328    where
329        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
330        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
331    {
332        let runner = deterministic::Runner::timed(Duration::from_secs(30));
333
334        runner.start(|mut context| async move {
335            let num_validators = 4;
336            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
337            let epoch = Epoch::new(111);
338
339            let (mut oracle, mut registrations) =
340                initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
341
342            let reporters = spawn_validator_engines(
343                context.child("validator"),
344                &fixture,
345                &mut registrations,
346                &mut oracle,
347                epoch,
348                Duration::from_secs(5),
349                vec![],
350            );
351
352            await_reporters(
353                context.child("reporter"),
354                &reporters,
355                Height::new(100),
356                epoch,
357            )
358            .await;
359        });
360    }
361
362    #[test_group("slow")]
363    #[test_traced("INFO")]
364    fn test_all_online() {
365        all_online(bls12381_threshold::fixture::<MinPk, _>);
366        all_online(bls12381_threshold::fixture::<MinSig, _>);
367        all_online(bls12381_multisig::fixture::<MinPk, _>);
368        all_online(bls12381_multisig::fixture::<MinSig, _>);
369        all_online(ed25519::fixture);
370        all_online(secp256r1::fixture);
371    }
372
373    /// Test consensus resilience to Byzantine behavior.
374    fn byzantine_proposer<S, F>(fixture: F)
375    where
376        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
377        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
378    {
379        let runner = deterministic::Runner::timed(Duration::from_secs(30));
380
381        runner.start(|mut context| async move {
382            let num_validators = 4;
383            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
384            let epoch = Epoch::new(111);
385
386            let (mut oracle, mut registrations) =
387                initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
388
389            let reporters = spawn_validator_engines(
390                context.child("validator"),
391                &fixture,
392                &mut registrations,
393                &mut oracle,
394                epoch,
395                Duration::from_secs(5),
396                vec![0],
397            );
398
399            await_reporters(
400                context.child("reporter"),
401                &reporters,
402                Height::new(100),
403                epoch,
404            )
405            .await;
406        });
407    }
408
409    #[test_traced("INFO")]
410    fn test_byzantine_proposer() {
411        byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
412        byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
413        byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
414        byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
415        byzantine_proposer(ed25519::fixture);
416        byzantine_proposer(secp256r1::fixture);
417    }
418
419    fn unclean_byzantine_shutdown<S, F>(fixture: F)
420    where
421        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
422        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
423    {
424        // Test parameters
425        let num_validators = 4;
426        let target_height = Height::new(200); // Target multiple rounds of signing
427        let min_shutdowns = 4; // Minimum number of shutdowns per validator
428        let max_shutdowns = 10; // Maximum number of shutdowns per validator
429        let shutdown_range_min = Duration::from_millis(100);
430        let shutdown_range_max = Duration::from_millis(1_000);
431        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
432
433        let mut prev_checkpoint = None;
434
435        // Generate fixture once (persists across restarts)
436        let mut rng = test_rng();
437        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
438
439        // Continue until shared reporter reaches target or max shutdowns exceeded
440        let mut shutdown_count = 0;
441        while shutdown_count < max_shutdowns {
442            let fixture = fixture.clone();
443            let f = move |mut context: Context| {
444                async move {
445                    let epoch = Epoch::new(111);
446
447                    let (oracle, mut registrations) =
448                        initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
449                            .await;
450
451                    // Create a shared reporter
452                    //
453                    // We rely on replay to populate this reporter with a contiguous history of certificates.
454                    let (reporter, mut reporter_mailbox) =
455                        mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
456                    reporter.start();
457
458                    // Spawn validator engines
459                    for (idx, participant) in fixture.participants.iter().enumerate() {
460                        let validator_context = context
461                            .child("participant")
462                            .with_attribute("public_key", participant);
463
464                        // Create Provider and register scheme for epoch
465                        let provider = mocks::Provider::new();
466                        assert!(provider.register(epoch, fixture.schemes[idx].clone()));
467
468                        // Create monitor
469                        let monitor = mocks::Monitor::new(epoch);
470
471                        // Create automaton (validator 0 is Byzantine)
472                        let strategy = if idx == 0 {
473                            mocks::Strategy::Incorrect
474                        } else {
475                            mocks::Strategy::Correct
476                        };
477                        let automaton = mocks::Application::new(strategy);
478
479                        // Create blocker
480                        let blocker = oracle.control(participant.clone());
481
482                        // Create and start engine
483                        let engine = Engine::new(
484                            validator_context.child("engine"),
485                            Config {
486                                monitor,
487                                provider,
488                                automaton,
489                                reporter: reporter_mailbox.clone(),
490                                blocker,
491                                priority_acks: false,
492                                rebroadcast_timeout,
493                                epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
494                                window: std::num::NonZeroU64::new(10).unwrap(),
495                                activity_timeout: HeightDelta::new(1_024), // ensure we don't drop any certificates
496                                journal_partition: format!("unclean_shutdown_test_{participant}"),
497                                journal_write_buffer: NZUsize!(4096),
498                                journal_replay_buffer: NZUsize!(4096),
499                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
500                                journal_compression: Some(3),
501                                journal_page_cache: CacheRef::from_pooler(
502                                    &context,
503                                    PAGE_SIZE,
504                                    PAGE_CACHE_SIZE,
505                                ),
506                                strategy: Sequential,
507                            },
508                        );
509
510                        let (sender, receiver) = registrations.remove(participant).unwrap();
511                        engine.start((sender, receiver));
512                    }
513
514                    // Create a single completion watcher for the shared reporter
515                    let completion =
516                        context
517                            .child("completion_watcher")
518                            .spawn(move |context| async move {
519                                loop {
520                                    if let Some(tip_height) =
521                                        reporter_mailbox.get_contiguous_tip().await
522                                    {
523                                        if tip_height >= target_height {
524                                            break;
525                                        }
526                                    }
527                                    context.sleep(Duration::from_millis(50)).await;
528                                }
529                            });
530
531                    // Random shutdown timing to simulate unclean shutdown
532                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
533                    select! {
534                        _ = context.sleep(shutdown_wait) => {
535                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
536                            false // Unclean shutdown
537                        },
538                        _ = completion => {
539                            debug!("Shared reporter completed normally");
540                            true // Clean completion
541                        },
542                    }
543                }
544            };
545
546            let (complete, checkpoint) = prev_checkpoint
547                .map_or_else(
548                    || {
549                        debug!("Starting initial run");
550                        deterministic::Runner::timed(Duration::from_secs(45))
551                    },
552                    |prev_checkpoint| {
553                        debug!(shutdown_count, "Restarting from previous context");
554                        deterministic::Runner::from(prev_checkpoint)
555                    },
556                )
557                .start_and_recover(f);
558
559            if complete && shutdown_count >= min_shutdowns {
560                debug!("Test completed successfully");
561                break;
562            }
563
564            prev_checkpoint = Some(checkpoint);
565            shutdown_count += 1;
566        }
567    }
568
569    #[test_group("slow")]
570    #[test_traced("INFO")]
571    fn test_unclean_byzantine_shutdown() {
572        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
573        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
574        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
575        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
576        unclean_byzantine_shutdown(ed25519::fixture);
577        unclean_byzantine_shutdown(secp256r1::fixture);
578    }
579
580    fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
581    where
582        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
583        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
584    {
585        // Test parameters
586        let num_validators = 4;
587        let skip_height = Height::new(50); // Height where no one will sign
588        let window = HeightDelta::new(10);
589        let target_height = Height::new(100);
590
591        // Generate fixture once (persists across restarts)
592        let mut rng = test_rng();
593        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
594
595        // First run: let validators skip signing at skip_height and reach beyond it
596        let f = |context: Context| {
597            let fixture = fixture.clone();
598            async move {
599                let epoch = Epoch::new(111);
600
601                // Set up simulated network
602                let (oracle, mut registrations) =
603                    initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
604                        .await;
605
606                // Create a shared reporter
607                let (reporter, mut reporter_mailbox) =
608                    mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
609                reporter.start();
610
611                // Start validator engines with Skip strategy for skip_height
612                for (idx, participant) in fixture.participants.iter().enumerate() {
613                    let validator_context = context
614                        .child("participant")
615                        .with_attribute("public_key", participant);
616
617                    // Create Provider and register scheme for epoch
618                    let provider = mocks::Provider::new();
619                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
620
621                    // Create monitor
622                    let monitor = mocks::Monitor::new(epoch);
623
624                    // All validators use Skip strategy for skip_height
625                    let automaton = mocks::Application::new(mocks::Strategy::Skip {
626                        height: skip_height,
627                    });
628
629                    // Create blocker
630                    let blocker = oracle.control(participant.clone());
631
632                    // Create and start engine
633                    let engine = Engine::new(
634                        validator_context.child("engine"),
635                        Config {
636                            monitor,
637                            provider,
638                            automaton,
639                            reporter: reporter_mailbox.clone(),
640                            blocker,
641                            priority_acks: false,
642                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
643                                100,
644                            )),
645                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
646                            window: std::num::NonZeroU64::new(window.get()).unwrap(),
647                            activity_timeout: HeightDelta::new(100),
648                            journal_partition: format!("unsigned_height_test_{participant}"),
649                            journal_write_buffer: NZUsize!(4096),
650                            journal_replay_buffer: NZUsize!(4096),
651                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
652                            journal_compression: Some(3),
653                            journal_page_cache: CacheRef::from_pooler(
654                                &context,
655                                PAGE_SIZE,
656                                PAGE_CACHE_SIZE,
657                            ),
658                            strategy: Sequential,
659                        },
660                    );
661
662                    let (sender, receiver) = registrations.remove(participant).unwrap();
663                    engine.start((sender, receiver));
664                }
665
666                // Wait for validators to reach target_height (past skip_height)
667                loop {
668                    if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
669                        debug!(%tip_height, %skip_height, %target_height, "reporter status");
670                        if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
671                            // max we can proceed before item confirmed
672                            return;
673                        }
674                    }
675                    context.sleep(Duration::from_millis(50)).await;
676                }
677            }
678        };
679
680        let (_, checkpoint) =
681            deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
682
683        // Second run: restart and verify the skip_height gets confirmed
684        let f2 = |context: Context| {
685            async move {
686                let epoch = Epoch::new(111);
687
688                // Set up simulated network
689                let (oracle, mut registrations) =
690                    initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
691                        .await;
692
693                // Create a shared reporter
694                let (reporter, mut reporter_mailbox) =
695                    mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
696                reporter.start();
697
698                // Start validator engines with Correct strategy (will sign everything now)
699                for (idx, participant) in fixture.participants.iter().enumerate() {
700                    let validator_context = context
701                        .child("participant")
702                        .with_attribute("public_key", participant);
703
704                    // Create Provider and register scheme for epoch
705                    let provider = mocks::Provider::new();
706                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
707
708                    // Create monitor
709                    let monitor = mocks::Monitor::new(epoch);
710
711                    // Now all validators use Correct strategy
712                    let automaton = mocks::Application::new(mocks::Strategy::Correct);
713
714                    // Create blocker
715                    let blocker = oracle.control(participant.clone());
716
717                    // Create and start engine
718                    let engine = Engine::new(
719                        validator_context.child("engine"),
720                        Config {
721                            monitor,
722                            provider,
723                            automaton,
724                            reporter: reporter_mailbox.clone(),
725                            blocker,
726                            priority_acks: false,
727                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
728                                100,
729                            )),
730                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
731                            window: std::num::NonZeroU64::new(10).unwrap(),
732                            activity_timeout: HeightDelta::new(100),
733                            journal_partition: format!("unsigned_height_test_{participant}"),
734                            journal_write_buffer: NZUsize!(4096),
735                            journal_replay_buffer: NZUsize!(4096),
736                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
737                            journal_compression: Some(3),
738                            journal_page_cache: CacheRef::from_pooler(
739                                &context,
740                                PAGE_SIZE,
741                                PAGE_CACHE_SIZE,
742                            ),
743                            strategy: Sequential,
744                        },
745                    );
746
747                    let (sender, receiver) = registrations.remove(participant).unwrap();
748                    engine.start((sender, receiver));
749                }
750
751                // Wait for skip_height to be confirmed (should happen on replay)
752                loop {
753                    if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
754                        debug!(
755                            %tip_height,
756                            %skip_height, %target_height, "reporter status on restart"
757                        );
758                        if tip_height >= target_height {
759                            break;
760                        }
761                    }
762                    context.sleep(Duration::from_millis(50)).await;
763                }
764            }
765        };
766
767        deterministic::Runner::from(checkpoint).start(f2);
768    }
769
770    #[test_group("slow")]
771    #[test_traced("INFO")]
772    fn test_unclean_shutdown_with_unsigned_height() {
773        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
774        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
775        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
776        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
777        unclean_shutdown_with_unsigned_height(ed25519::fixture);
778        unclean_shutdown_with_unsigned_height(secp256r1::fixture);
779    }
780
781    fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
782    where
783        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
784        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
785    {
786        let cfg = deterministic::Config::new()
787            .with_seed(seed)
788            .with_timeout(Some(Duration::from_secs(120)));
789        let runner = deterministic::Runner::new(cfg);
790
791        runner.start(|mut context| async move {
792            let num_validators = 4;
793            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
794            let epoch = Epoch::new(111);
795
796            // Use degraded network links with realistic conditions
797            let degraded_link = Link {
798                latency: Duration::from_millis(200),
799                jitter: Duration::from_millis(150),
800                success_rate: 0.5,
801            };
802
803            let (mut oracle, mut registrations) =
804                initialize_simulation(context.child("simulation"), &fixture, degraded_link).await;
805
806            let reporters = spawn_validator_engines(
807                context.child("validator"),
808                &fixture,
809                &mut registrations,
810                &mut oracle,
811                epoch,
812                Duration::from_secs(2),
813                vec![],
814            );
815
816            await_reporters(
817                context.child("reporter"),
818                &reporters,
819                Height::new(100),
820                epoch,
821            )
822            .await;
823
824            context.auditor().state()
825        })
826    }
827
828    #[test_group("slow")]
829    #[test_traced("INFO")]
830    fn test_slow_and_lossy_links() {
831        slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
832        slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
833        slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
834        slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
835        slow_and_lossy_links(ed25519::fixture, 0);
836        slow_and_lossy_links(secp256r1::fixture, 0);
837    }
838
839    #[test_group("slow")]
840    #[test_traced("INFO")]
841    fn test_determinism() {
842        // We use slow and lossy links as the deterministic test
843        // because it is the most complex test.
844        for seed in 1..6 {
845            // Test BLS threshold MinPk
846            let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
847            let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
848            assert_eq!(ts_pk_state_1, ts_pk_state_2);
849
850            // Test BLS threshold MinSig
851            let ts_sig_state_1 =
852                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
853            let ts_sig_state_2 =
854                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
855            assert_eq!(ts_sig_state_1, ts_sig_state_2);
856
857            // Test BLS multisig MinPk
858            let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
859            let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
860            assert_eq!(ms_pk_state_1, ms_pk_state_2);
861
862            // Test BLS multisig MinSig
863            let ms_sig_state_1 =
864                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
865            let ms_sig_state_2 =
866                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
867            assert_eq!(ms_sig_state_1, ms_sig_state_2);
868
869            // Test ed25519
870            let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
871            let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
872            assert_eq!(ed_state_1, ed_state_2);
873
874            // Test secp256r1
875            let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
876            let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
877            assert_eq!(secp_state_1, secp_state_2);
878
879            let states = [
880                ("threshold-minpk", ts_pk_state_1),
881                ("threshold-minsig", ts_sig_state_1),
882                ("multisig-minpk", ms_pk_state_1),
883                ("multisig-minsig", ms_sig_state_1),
884                ("ed25519", ed_state_1),
885                ("secp256r1", secp_state_1),
886            ];
887
888            // Sanity check that different types can't be identical
889            for pair in states.windows(2) {
890                assert_ne!(
891                    pair[0].1, pair[1].1,
892                    "state {} equals state {}",
893                    pair[0].0, pair[1].0
894                );
895            }
896        }
897    }
898
899    fn one_offline<S, F>(fixture: F)
900    where
901        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
902        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
903    {
904        let runner = deterministic::Runner::timed(Duration::from_secs(30));
905
906        runner.start(|mut context| async move {
907            let num_validators = 5;
908            let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
909            let epoch = Epoch::new(111);
910
911            // Truncate to only 4 validators (one offline)
912            fixture.participants.truncate(4);
913            fixture.schemes.truncate(4);
914
915            let (mut oracle, mut registrations) =
916                initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
917
918            let reporters = spawn_validator_engines(
919                context.child("validator"),
920                &fixture,
921                &mut registrations,
922                &mut oracle,
923                epoch,
924                Duration::from_secs(5),
925                vec![],
926            );
927
928            await_reporters(
929                context.child("reporter"),
930                &reporters,
931                Height::new(100),
932                epoch,
933            )
934            .await;
935        });
936    }
937
938    #[test_group("slow")]
939    #[test_traced("INFO")]
940    fn test_one_offline() {
941        one_offline(bls12381_threshold::fixture::<MinPk, _>);
942        one_offline(bls12381_threshold::fixture::<MinSig, _>);
943        one_offline(bls12381_multisig::fixture::<MinPk, _>);
944        one_offline(bls12381_multisig::fixture::<MinSig, _>);
945        one_offline(ed25519::fixture);
946        one_offline(secp256r1::fixture);
947    }
948
949    /// Test consensus recovery after a network partition.
950    fn network_partition<S, F>(fixture: F)
951    where
952        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
953        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
954    {
955        let runner = deterministic::Runner::timed(Duration::from_secs(60));
956
957        runner.start(|mut context| async move {
958            let num_validators = 4;
959            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
960            let epoch = Epoch::new(111);
961
962            let (mut oracle, mut registrations) =
963                initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
964
965            let reporters = spawn_validator_engines(
966                context.child("validator"),
967                &fixture,
968                &mut registrations,
969                &mut oracle,
970                epoch,
971                Duration::from_secs(5),
972                vec![],
973            );
974
975            // Partition network (remove all links)
976            for v1 in fixture.participants.iter() {
977                for v2 in fixture.participants.iter() {
978                    if v2 == v1 {
979                        continue;
980                    }
981                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
982                }
983            }
984            context.sleep(Duration::from_secs(20)).await;
985
986            // Restore network links
987            for v1 in fixture.participants.iter() {
988                for v2 in fixture.participants.iter() {
989                    if v2 == v1 {
990                        continue;
991                    }
992                    oracle
993                        .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
994                        .await
995                        .unwrap();
996                }
997            }
998
999            await_reporters(
1000                context.child("reporter"),
1001                &reporters,
1002                Height::new(100),
1003                epoch,
1004            )
1005            .await;
1006        });
1007    }
1008
1009    #[test_traced("INFO")]
1010    fn test_network_partition() {
1011        network_partition(bls12381_threshold::fixture::<MinPk, _>);
1012        network_partition(bls12381_threshold::fixture::<MinSig, _>);
1013        network_partition(bls12381_multisig::fixture::<MinPk, _>);
1014        network_partition(bls12381_multisig::fixture::<MinSig, _>);
1015        network_partition(ed25519::fixture);
1016        network_partition(secp256r1::fixture);
1017    }
1018
1019    /// Test insufficient validator participation (below quorum).
1020    fn insufficient_validators<S, F>(fixture: F)
1021    where
1022        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1023        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1024    {
1025        let runner = deterministic::Runner::timed(Duration::from_secs(15));
1026
1027        runner.start(|mut context| async move {
1028            let num_validators = 5;
1029            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1030            let epoch = Epoch::new(111);
1031
1032            // Set up simulated network
1033            let (oracle, mut registrations) =
1034                initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
1035                    .await;
1036
1037            // Create reporters (one per online validator)
1038            let mut reporters =
1039                BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1040
1041            // Start only 2 out of 5 validators (below quorum of 3)
1042            for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1043                let context = context.child("participant").with_attribute("public_key", participant);
1044
1045                // Create Provider and register scheme for epoch
1046                let provider = mocks::Provider::new();
1047                assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1048
1049                // Create monitor
1050                let monitor = mocks::Monitor::new(epoch);
1051
1052                // Create automaton with Correct strategy
1053                let automaton = mocks::Application::new(mocks::Strategy::Correct);
1054
1055                // Create reporter with verifier scheme
1056                let (reporter, reporter_mailbox) =
1057                    mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
1058                reporter.start();
1059                reporters.insert(participant.clone(), reporter_mailbox.clone());
1060
1061                // Create blocker
1062                let blocker = oracle.control(participant.clone());
1063
1064                // Create and start engine
1065                let engine = Engine::new(
1066                    context.child("engine"),
1067                    Config {
1068                        monitor,
1069                        provider,
1070                        automaton,
1071                        reporter: reporter_mailbox,
1072                        blocker,
1073                        priority_acks: false,
1074                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1075                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1076                        window: std::num::NonZeroU64::new(10).unwrap(),
1077                        activity_timeout: HeightDelta::new(100),
1078                        journal_partition: format!("aggregation-{participant}"),
1079                        journal_write_buffer: NZUsize!(4096),
1080                        journal_replay_buffer: NZUsize!(4096),
1081                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1082                        journal_compression: Some(3),
1083                        journal_page_cache: CacheRef::from_pooler(
1084                            &context,
1085                            PAGE_SIZE,
1086                            PAGE_CACHE_SIZE,
1087                        ),
1088                        strategy: Sequential,
1089                    },
1090                );
1091
1092                let (sender, receiver) = registrations.remove(participant).unwrap();
1093                engine.start((sender, receiver));
1094            }
1095
1096            // With insufficient validators, consensus should not be achievable
1097            // Wait long enough for any potential consensus attempts to complete
1098            context.sleep(Duration::from_secs(12)).await;
1099
1100            // Check that no validator achieved consensus
1101            let mut any_consensus = false;
1102            for (validator_pk, mut reporter_mailbox) in reporters {
1103                let (tip, _) = reporter_mailbox
1104                    .get_tip()
1105                    .await
1106                    .unwrap_or((Height::zero(), Epoch::zero()));
1107                if !tip.is_zero() {
1108                    any_consensus = true;
1109                    tracing::warn!(
1110                        ?validator_pk,
1111                        %tip,
1112                        "Unexpected consensus with insufficient validators"
1113                    );
1114                }
1115            }
1116
1117            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1118            assert!(
1119                !any_consensus,
1120                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1121            );
1122        });
1123    }
1124
1125    #[test_traced("INFO")]
1126    fn test_insufficient_validators() {
1127        insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1128        insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1129        insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1130        insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1131        insufficient_validators(ed25519::fixture);
1132        insufficient_validators(secp256r1::fixture);
1133    }
1134}