Skip to main content

commonware_consensus/aggregation/
mod.rs

1//! Recover quorum certificates over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce quorum certificates
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect quorum certificates over concurrent, sequencer-driven broadcast,
12//! check out [crate::ordered_broadcast]._
13//!
14//! # Pluggable Cryptography
15//!
16//! The aggregation module is generic over the signing scheme, allowing users to choose the
17//! cryptographic scheme best suited for their requirements:
18//!
19//! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification.
20//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
21//!
22//! - [`secp256r1`][scheme::secp256r1]: Attributable signatures with individual verification.
23//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
24//!
25//! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated
26//!   verification. Produces compact certificates while preserving signer attribution.
27//!
28//! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures.
29//!   Produces succinct constant-size certificates. Requires trusted setup (DKG).
30//!
31//! # Architecture
32//!
33//! The core of the module is the [Engine]. It manages the agreement process by:
34//! - Requesting externally synchronized [commonware_cryptography::Digest]s
35//! - Signing said digests with the configured scheme's signature type
36//! - Multicasting signatures/shares to other validators
37//! - Assembling certificates from a quorum of signatures
38//! - Monitoring recovery progress and notifying the application layer of recoveries
39//!
40//! The engine interacts with four main components:
41//! - [crate::Automaton]: Provides external digests
42//! - [crate::Reporter]: Receives agreement confirmations
43//! - [crate::Monitor]: Tracks epoch transitions
44//! - [commonware_cryptography::certificate::Provider]: Manages validator sets and network identities
45//!
46//! # Design Decisions
47//!
48//! ## Missing Certificate Resolution
49//!
50//! The engine does not try to "fill gaps" when certificates are missing. When validators
51//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
52//! certificates may never be emitted by the local engine. Before skipping ahead, we ensure that
53//! at-least-one honest validator has the certificate for any skipped height.
54//!
55//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
56//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
57//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
58//! historical results.
59//!
60//! ## Recovering Certificates
61//!
62//! In aggregation, participants never gossip recovered certificates. Rather, they gossip [types::TipAck]s
63//! with signatures over some height and their latest tip. This approach reduces the overhead of running aggregation
64//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
65//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
66//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
67//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
68
69pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73    if #[cfg(not(target_arch = "wasm32"))] {
74        mod config;
75        pub use config::Config;
76        mod engine;
77        pub use engine::Engine;
78        mod metrics;
79        mod safe_tip;
80
81        #[cfg(test)]
82        pub mod mocks;
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::{mocks, Config, Engine};
89    use crate::{
90        aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91        types::{Epoch, EpochDelta, Height, HeightDelta},
92    };
93    use commonware_cryptography::{
94        bls12381::primitives::variant::{MinPk, MinSig},
95        certificate::mocks::Fixture,
96        ed25519::PublicKey,
97        sha256::Digest as Sha256Digest,
98    };
99    use commonware_macros::{select, test_group, test_traced};
100    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101    use commonware_parallel::Sequential;
102    use commonware_runtime::{
103        buffer::paged::CacheRef,
104        deterministic::{self, Context},
105        Clock, Metrics, Quota, Runner, Spawner,
106    };
107    use commonware_utils::{
108        channel::{fallible::OneshotExt, oneshot},
109        test_rng, NZUsize, NonZeroDuration, NZU16,
110    };
111    use futures::future::join_all;
112    use rand::{rngs::StdRng, Rng};
113    use std::{
114        collections::BTreeMap,
115        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116        time::Duration,
117    };
118    use tracing::debug;
119
120    type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125    const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127    /// Reliable network link configuration for testing.
128    const RELIABLE_LINK: Link = Link {
129        latency: Duration::from_millis(10),
130        jitter: Duration::from_millis(1),
131        success_rate: 1.0,
132    };
133
134    /// Register all participants with the network oracle.
135    async fn register_participants(
136        oracle: &mut Oracle<PublicKey, deterministic::Context>,
137        participants: &[PublicKey],
138    ) -> Registrations<PublicKey> {
139        let mut registrations = BTreeMap::new();
140        for participant in participants.iter() {
141            let (sender, receiver) = oracle
142                .control(participant.clone())
143                .register(0, TEST_QUOTA)
144                .await
145                .unwrap();
146            registrations.insert(participant.clone(), (sender, receiver));
147        }
148        registrations
149    }
150
151    /// Establish network links between all participants.
152    async fn link_participants(
153        oracle: &mut Oracle<PublicKey, deterministic::Context>,
154        participants: &[PublicKey],
155        link: Link,
156    ) {
157        for v1 in participants.iter() {
158            for v2 in participants.iter() {
159                if v2 == v1 {
160                    continue;
161                }
162                oracle
163                    .add_link(v1.clone(), v2.clone(), link.clone())
164                    .await
165                    .unwrap();
166            }
167        }
168    }
169
170    /// Initialize a simulated network environment.
171    async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172        context: Context,
173        fixture: &Fixture<S>,
174        link: Link,
175    ) -> (
176        Oracle<PublicKey, deterministic::Context>,
177        Registrations<PublicKey>,
178    ) {
179        let (network, mut oracle) = Network::new(
180            context.with_label("network"),
181            commonware_p2p::simulated::Config {
182                max_size: 1024 * 1024,
183                disconnect_on_block: true,
184                tracked_peer_sets: None,
185            },
186        );
187        network.start();
188
189        let registrations = register_participants(&mut oracle, &fixture.participants).await;
190        link_participants(&mut oracle, &fixture.participants, link).await;
191
192        (oracle, registrations)
193    }
194
195    /// Spawn aggregation engines for all validators.
196    fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
197        context: Context,
198        fixture: &Fixture<S>,
199        registrations: &mut Registrations<PublicKey>,
200        oracle: &mut Oracle<PublicKey, deterministic::Context>,
201        epoch: Epoch,
202        rebroadcast_timeout: Duration,
203        incorrect: Vec<usize>,
204    ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
205        let mut reporters = BTreeMap::new();
206
207        for (idx, participant) in fixture.participants.iter().enumerate() {
208            let context = context.with_label(&format!("participant_{participant}"));
209
210            // Create Provider and register scheme for epoch
211            let provider = mocks::Provider::new();
212            assert!(provider.register(epoch, fixture.schemes[idx].clone()));
213
214            // Create monitor
215            let monitor = mocks::Monitor::new(epoch);
216
217            // Create automaton with Incorrect strategy for byzantine validators
218            let strategy = if incorrect.contains(&idx) {
219                mocks::Strategy::Incorrect
220            } else {
221                mocks::Strategy::Correct
222            };
223            let automaton = mocks::Application::new(strategy);
224
225            // Create reporter with verifier scheme
226            let (reporter, reporter_mailbox) =
227                mocks::Reporter::new(context.clone(), fixture.verifier.clone());
228            context.with_label("reporter").spawn(|_| reporter.run());
229            reporters.insert(participant.clone(), reporter_mailbox.clone());
230
231            // Create blocker
232            let blocker = oracle.control(participant.clone());
233
234            // Create and start engine
235            let engine = Engine::new(
236                context.with_label("engine"),
237                Config {
238                    monitor,
239                    provider,
240                    automaton,
241                    reporter: reporter_mailbox,
242                    blocker,
243                    priority_acks: false,
244                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
245                    epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
246                    window: std::num::NonZeroU64::new(10).unwrap(),
247                    activity_timeout: HeightDelta::new(100),
248                    journal_partition: format!("aggregation-{participant}"),
249                    journal_write_buffer: NZUsize!(4096),
250                    journal_replay_buffer: NZUsize!(4096),
251                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
252                    journal_compression: Some(3),
253                    journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
254                    strategy: Sequential,
255                },
256            );
257
258            let (sender, receiver) = registrations.remove(participant).unwrap();
259            engine.start((sender, receiver));
260        }
261
262        reporters
263    }
264
265    /// Wait for all reporters to reach the specified consensus threshold.
266    async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
267        context: Context,
268        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
269        threshold_height: Height,
270        threshold_epoch: Epoch,
271    ) {
272        let mut receivers = Vec::new();
273        for (reporter, mailbox) in reporters.iter() {
274            // Create a oneshot channel to signal when the reporter has reached the threshold.
275            let (tx, rx) = oneshot::channel();
276            receivers.push(rx);
277
278            context.with_label("reporter_watcher").spawn({
279                let reporter = reporter.clone();
280                let mut mailbox = mailbox.clone();
281                move |context| async move {
282                    loop {
283                        let (height, epoch) = mailbox
284                            .get_tip()
285                            .await
286                            .unwrap_or((Height::zero(), Epoch::zero()));
287                        debug!(
288                            %height,
289                            epoch = %epoch,
290                            %threshold_height,
291                            threshold_epoch = %threshold_epoch,
292                            ?reporter,
293                            "reporter status"
294                        );
295                        if height >= threshold_height && epoch >= threshold_epoch {
296                            debug!(
297                                ?reporter,
298                                "reporter reached threshold, signaling completion"
299                            );
300                            tx.send_lossy(reporter.clone());
301                            break;
302                        }
303                        context.sleep(Duration::from_millis(100)).await;
304                    }
305                }
306            });
307        }
308
309        // Wait for all oneshot receivers to complete.
310        let results = join_all(receivers).await;
311        assert_eq!(results.len(), reporters.len());
312
313        // Check that none were cancelled.
314        for result in results {
315            assert!(result.is_ok(), "reporter was cancelled");
316        }
317    }
318
319    /// Test aggregation consensus with all validators online.
320    fn all_online<S, F>(fixture: F)
321    where
322        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
323        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
324    {
325        let runner = deterministic::Runner::timed(Duration::from_secs(30));
326
327        runner.start(|mut context| async move {
328            let num_validators = 4;
329            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
330            let epoch = Epoch::new(111);
331
332            let (mut oracle, mut registrations) =
333                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
334                    .await;
335
336            let reporters = spawn_validator_engines(
337                context.with_label("validator"),
338                &fixture,
339                &mut registrations,
340                &mut oracle,
341                epoch,
342                Duration::from_secs(5),
343                vec![],
344            );
345
346            await_reporters(
347                context.with_label("reporter"),
348                &reporters,
349                Height::new(100),
350                epoch,
351            )
352            .await;
353        });
354    }
355
356    #[test_traced("INFO")]
357    fn test_all_online() {
358        all_online(bls12381_threshold::fixture::<MinPk, _>);
359        all_online(bls12381_threshold::fixture::<MinSig, _>);
360        all_online(bls12381_multisig::fixture::<MinPk, _>);
361        all_online(bls12381_multisig::fixture::<MinSig, _>);
362        all_online(ed25519::fixture);
363        all_online(secp256r1::fixture);
364    }
365
366    /// Test consensus resilience to Byzantine behavior.
367    fn byzantine_proposer<S, F>(fixture: F)
368    where
369        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
370        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
371    {
372        let runner = deterministic::Runner::timed(Duration::from_secs(30));
373
374        runner.start(|mut context| async move {
375            let num_validators = 4;
376            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
377            let epoch = Epoch::new(111);
378
379            let (mut oracle, mut registrations) =
380                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
381                    .await;
382
383            let reporters = spawn_validator_engines(
384                context.with_label("validator"),
385                &fixture,
386                &mut registrations,
387                &mut oracle,
388                epoch,
389                Duration::from_secs(5),
390                vec![0],
391            );
392
393            await_reporters(
394                context.with_label("reporter"),
395                &reporters,
396                Height::new(100),
397                epoch,
398            )
399            .await;
400        });
401    }
402
403    #[test_traced("INFO")]
404    fn test_byzantine_proposer() {
405        byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
406        byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
407        byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
408        byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
409        byzantine_proposer(ed25519::fixture);
410        byzantine_proposer(secp256r1::fixture);
411    }
412
413    fn unclean_byzantine_shutdown<S, F>(fixture: F)
414    where
415        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
416        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
417    {
418        // Test parameters
419        let num_validators = 4;
420        let target_height = Height::new(200); // Target multiple rounds of signing
421        let min_shutdowns = 4; // Minimum number of shutdowns per validator
422        let max_shutdowns = 10; // Maximum number of shutdowns per validator
423        let shutdown_range_min = Duration::from_millis(100);
424        let shutdown_range_max = Duration::from_millis(1_000);
425        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
426
427        let mut prev_checkpoint = None;
428
429        // Generate fixture once (persists across restarts)
430        let mut rng = test_rng();
431        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
432
433        // Continue until shared reporter reaches target or max shutdowns exceeded
434        let mut shutdown_count = 0;
435        while shutdown_count < max_shutdowns {
436            let fixture = fixture.clone();
437            let f = move |mut context: Context| {
438                async move {
439                    let epoch = Epoch::new(111);
440
441                    let (oracle, mut registrations) = initialize_simulation(
442                        context.with_label("simulation"),
443                        &fixture,
444                        RELIABLE_LINK,
445                    )
446                    .await;
447
448                    // Create a shared reporter
449                    //
450                    // We rely on replay to populate this reporter with a contiguous history of certificates.
451                    let (reporter, mut reporter_mailbox) =
452                        mocks::Reporter::new(context.clone(), fixture.verifier.clone());
453                    context.with_label("reporter").spawn(|_| reporter.run());
454
455                    // Spawn validator engines
456                    for (idx, participant) in fixture.participants.iter().enumerate() {
457                        let validator_context =
458                            context.with_label(&format!("participant_{participant}"));
459
460                        // Create Provider and register scheme for epoch
461                        let provider = mocks::Provider::new();
462                        assert!(provider.register(epoch, fixture.schemes[idx].clone()));
463
464                        // Create monitor
465                        let monitor = mocks::Monitor::new(epoch);
466
467                        // Create automaton (validator 0 is Byzantine)
468                        let strategy = if idx == 0 {
469                            mocks::Strategy::Incorrect
470                        } else {
471                            mocks::Strategy::Correct
472                        };
473                        let automaton = mocks::Application::new(strategy);
474
475                        // Create blocker
476                        let blocker = oracle.control(participant.clone());
477
478                        // Create and start engine
479                        let engine = Engine::new(
480                            validator_context.with_label("engine"),
481                            Config {
482                                monitor,
483                                provider,
484                                automaton,
485                                reporter: reporter_mailbox.clone(),
486                                blocker,
487                                priority_acks: false,
488                                rebroadcast_timeout,
489                                epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
490                                window: std::num::NonZeroU64::new(10).unwrap(),
491                                activity_timeout: HeightDelta::new(1_024), // ensure we don't drop any certificates
492                                journal_partition: format!("unclean_shutdown_test_{participant}"),
493                                journal_write_buffer: NZUsize!(4096),
494                                journal_replay_buffer: NZUsize!(4096),
495                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
496                                journal_compression: Some(3),
497                                journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
498                                strategy: Sequential,
499                            },
500                        );
501
502                        let (sender, receiver) = registrations.remove(participant).unwrap();
503                        engine.start((sender, receiver));
504                    }
505
506                    // Create a single completion watcher for the shared reporter
507                    let completion =
508                        context
509                            .with_label("completion_watcher")
510                            .spawn(move |context| async move {
511                                loop {
512                                    if let Some(tip_height) =
513                                        reporter_mailbox.get_contiguous_tip().await
514                                    {
515                                        if tip_height >= target_height {
516                                            break;
517                                        }
518                                    }
519                                    context.sleep(Duration::from_millis(50)).await;
520                                }
521                            });
522
523                    // Random shutdown timing to simulate unclean shutdown
524                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
525                    select! {
526                        _ = context.sleep(shutdown_wait) => {
527                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
528                            false // Unclean shutdown
529                        },
530                        _ = completion => {
531                            debug!("Shared reporter completed normally");
532                            true // Clean completion
533                        },
534                    }
535                }
536            };
537
538            let (complete, checkpoint) = prev_checkpoint
539                .map_or_else(
540                    || {
541                        debug!("Starting initial run");
542                        deterministic::Runner::timed(Duration::from_secs(45))
543                    },
544                    |prev_checkpoint| {
545                        debug!(shutdown_count, "Restarting from previous context");
546                        deterministic::Runner::from(prev_checkpoint)
547                    },
548                )
549                .start_and_recover(f);
550
551            if complete && shutdown_count >= min_shutdowns {
552                debug!("Test completed successfully");
553                break;
554            }
555
556            prev_checkpoint = Some(checkpoint);
557            shutdown_count += 1;
558        }
559    }
560
561    #[test_traced("INFO")]
562    fn test_unclean_byzantine_shutdown() {
563        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
564        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
565        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
566        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
567        unclean_byzantine_shutdown(ed25519::fixture);
568        unclean_byzantine_shutdown(secp256r1::fixture);
569    }
570
571    fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
572    where
573        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
574        F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
575    {
576        // Test parameters
577        let num_validators = 4;
578        let skip_height = Height::new(50); // Height where no one will sign
579        let window = HeightDelta::new(10);
580        let target_height = Height::new(100);
581
582        // Generate fixture once (persists across restarts)
583        let mut rng = test_rng();
584        let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
585
586        // First run: let validators skip signing at skip_height and reach beyond it
587        let f = |context: Context| {
588            let fixture = fixture.clone();
589            async move {
590                let epoch = Epoch::new(111);
591
592                // Set up simulated network
593                let (oracle, mut registrations) = initialize_simulation(
594                    context.with_label("simulation"),
595                    &fixture,
596                    RELIABLE_LINK,
597                )
598                .await;
599
600                // Create a shared reporter
601                let (reporter, mut reporter_mailbox) =
602                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
603                context.with_label("reporter").spawn(|_| reporter.run());
604
605                // Start validator engines with Skip strategy for skip_height
606                for (idx, participant) in fixture.participants.iter().enumerate() {
607                    let validator_context =
608                        context.with_label(&format!("participant_{participant}"));
609
610                    // Create Provider and register scheme for epoch
611                    let provider = mocks::Provider::new();
612                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
613
614                    // Create monitor
615                    let monitor = mocks::Monitor::new(epoch);
616
617                    // All validators use Skip strategy for skip_height
618                    let automaton = mocks::Application::new(mocks::Strategy::Skip {
619                        height: skip_height,
620                    });
621
622                    // Create blocker
623                    let blocker = oracle.control(participant.clone());
624
625                    // Create and start engine
626                    let engine = Engine::new(
627                        validator_context.with_label("engine"),
628                        Config {
629                            monitor,
630                            provider,
631                            automaton,
632                            reporter: reporter_mailbox.clone(),
633                            blocker,
634                            priority_acks: false,
635                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
636                                100,
637                            )),
638                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
639                            window: std::num::NonZeroU64::new(window.get()).unwrap(),
640                            activity_timeout: HeightDelta::new(100),
641                            journal_partition: format!("unsigned_height_test_{participant}"),
642                            journal_write_buffer: NZUsize!(4096),
643                            journal_replay_buffer: NZUsize!(4096),
644                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
645                            journal_compression: Some(3),
646                            journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
647                            strategy: Sequential,
648                        },
649                    );
650
651                    let (sender, receiver) = registrations.remove(participant).unwrap();
652                    engine.start((sender, receiver));
653                }
654
655                // Wait for validators to reach target_height (past skip_height)
656                loop {
657                    if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
658                        debug!(%tip_height, %skip_height, %target_height, "reporter status");
659                        if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
660                            // max we can proceed before item confirmed
661                            return;
662                        }
663                    }
664                    context.sleep(Duration::from_millis(50)).await;
665                }
666            }
667        };
668
669        let (_, checkpoint) =
670            deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
671
672        // Second run: restart and verify the skip_height gets confirmed
673        let f2 = |context: Context| {
674            async move {
675                let epoch = Epoch::new(111);
676
677                // Set up simulated network
678                let (oracle, mut registrations) = initialize_simulation(
679                    context.with_label("simulation"),
680                    &fixture,
681                    RELIABLE_LINK,
682                )
683                .await;
684
685                // Create a shared reporter
686                let (reporter, mut reporter_mailbox) =
687                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
688                context.with_label("reporter").spawn(|_| reporter.run());
689
690                // Start validator engines with Correct strategy (will sign everything now)
691                for (idx, participant) in fixture.participants.iter().enumerate() {
692                    let validator_context =
693                        context.with_label(&format!("participant_{participant}"));
694
695                    // Create Provider and register scheme for epoch
696                    let provider = mocks::Provider::new();
697                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
698
699                    // Create monitor
700                    let monitor = mocks::Monitor::new(epoch);
701
702                    // Now all validators use Correct strategy
703                    let automaton = mocks::Application::new(mocks::Strategy::Correct);
704
705                    // Create blocker
706                    let blocker = oracle.control(participant.clone());
707
708                    // Create and start engine
709                    let engine = Engine::new(
710                        validator_context.with_label("engine"),
711                        Config {
712                            monitor,
713                            provider,
714                            automaton,
715                            reporter: reporter_mailbox.clone(),
716                            blocker,
717                            priority_acks: false,
718                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
719                                100,
720                            )),
721                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
722                            window: std::num::NonZeroU64::new(10).unwrap(),
723                            activity_timeout: HeightDelta::new(100),
724                            journal_partition: format!("unsigned_height_test_{participant}"),
725                            journal_write_buffer: NZUsize!(4096),
726                            journal_replay_buffer: NZUsize!(4096),
727                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
728                            journal_compression: Some(3),
729                            journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
730                            strategy: Sequential,
731                        },
732                    );
733
734                    let (sender, receiver) = registrations.remove(participant).unwrap();
735                    engine.start((sender, receiver));
736                }
737
738                // Wait for skip_height to be confirmed (should happen on replay)
739                loop {
740                    if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
741                        debug!(
742                            %tip_height,
743                            %skip_height, %target_height, "reporter status on restart"
744                        );
745                        if tip_height >= target_height {
746                            break;
747                        }
748                    }
749                    context.sleep(Duration::from_millis(50)).await;
750                }
751            }
752        };
753
754        deterministic::Runner::from(checkpoint).start(f2);
755    }
756
757    #[test_traced("INFO")]
758    fn test_unclean_shutdown_with_unsigned_height() {
759        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
760        unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
761        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
762        unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
763        unclean_shutdown_with_unsigned_height(ed25519::fixture);
764        unclean_shutdown_with_unsigned_height(secp256r1::fixture);
765    }
766
767    fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
768    where
769        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
770        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
771    {
772        let cfg = deterministic::Config::new()
773            .with_seed(seed)
774            .with_timeout(Some(Duration::from_secs(120)));
775        let runner = deterministic::Runner::new(cfg);
776
777        runner.start(|mut context| async move {
778            let num_validators = 4;
779            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
780            let epoch = Epoch::new(111);
781
782            // Use degraded network links with realistic conditions
783            let degraded_link = Link {
784                latency: Duration::from_millis(200),
785                jitter: Duration::from_millis(150),
786                success_rate: 0.5,
787            };
788
789            let (mut oracle, mut registrations) =
790                initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
791                    .await;
792
793            let reporters = spawn_validator_engines(
794                context.with_label("validator"),
795                &fixture,
796                &mut registrations,
797                &mut oracle,
798                epoch,
799                Duration::from_secs(2),
800                vec![],
801            );
802
803            await_reporters(
804                context.with_label("reporter"),
805                &reporters,
806                Height::new(100),
807                epoch,
808            )
809            .await;
810
811            context.auditor().state()
812        })
813    }
814
815    #[test_traced("INFO")]
816    fn test_slow_and_lossy_links() {
817        slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
818        slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
819        slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
820        slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
821        slow_and_lossy_links(ed25519::fixture, 0);
822        slow_and_lossy_links(secp256r1::fixture, 0);
823    }
824
825    #[test_group("slow")]
826    #[test_traced("INFO")]
827    fn test_determinism() {
828        // We use slow and lossy links as the deterministic test
829        // because it is the most complex test.
830        for seed in 1..6 {
831            // Test BLS threshold MinPk
832            let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
833            let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
834            assert_eq!(ts_pk_state_1, ts_pk_state_2);
835
836            // Test BLS threshold MinSig
837            let ts_sig_state_1 =
838                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
839            let ts_sig_state_2 =
840                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
841            assert_eq!(ts_sig_state_1, ts_sig_state_2);
842
843            // Test BLS multisig MinPk
844            let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
845            let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
846            assert_eq!(ms_pk_state_1, ms_pk_state_2);
847
848            // Test BLS multisig MinSig
849            let ms_sig_state_1 =
850                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
851            let ms_sig_state_2 =
852                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
853            assert_eq!(ms_sig_state_1, ms_sig_state_2);
854
855            // Test ed25519
856            let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
857            let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
858            assert_eq!(ed_state_1, ed_state_2);
859
860            // Test secp256r1
861            let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
862            let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
863            assert_eq!(secp_state_1, secp_state_2);
864
865            let states = [
866                ("threshold-minpk", ts_pk_state_1),
867                ("threshold-minsig", ts_sig_state_1),
868                ("multisig-minpk", ms_pk_state_1),
869                ("multisig-minsig", ms_sig_state_1),
870                ("ed25519", ed_state_1),
871                ("secp256r1", secp_state_1),
872            ];
873
874            // Sanity check that different types can't be identical
875            for pair in states.windows(2) {
876                assert_ne!(
877                    pair[0].1, pair[1].1,
878                    "state {} equals state {}",
879                    pair[0].0, pair[1].0
880                );
881            }
882        }
883    }
884
885    fn one_offline<S, F>(fixture: F)
886    where
887        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
888        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
889    {
890        let runner = deterministic::Runner::timed(Duration::from_secs(30));
891
892        runner.start(|mut context| async move {
893            let num_validators = 5;
894            let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
895            let epoch = Epoch::new(111);
896
897            // Truncate to only 4 validators (one offline)
898            fixture.participants.truncate(4);
899            fixture.schemes.truncate(4);
900
901            let (mut oracle, mut registrations) =
902                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
903                    .await;
904
905            let reporters = spawn_validator_engines(
906                context.with_label("validator"),
907                &fixture,
908                &mut registrations,
909                &mut oracle,
910                epoch,
911                Duration::from_secs(5),
912                vec![],
913            );
914
915            await_reporters(
916                context.with_label("reporter"),
917                &reporters,
918                Height::new(100),
919                epoch,
920            )
921            .await;
922        });
923    }
924
925    #[test_traced("INFO")]
926    fn test_one_offline() {
927        one_offline(bls12381_threshold::fixture::<MinPk, _>);
928        one_offline(bls12381_threshold::fixture::<MinSig, _>);
929        one_offline(bls12381_multisig::fixture::<MinPk, _>);
930        one_offline(bls12381_multisig::fixture::<MinSig, _>);
931        one_offline(ed25519::fixture);
932        one_offline(secp256r1::fixture);
933    }
934
935    /// Test consensus recovery after a network partition.
936    fn network_partition<S, F>(fixture: F)
937    where
938        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
939        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
940    {
941        let runner = deterministic::Runner::timed(Duration::from_secs(60));
942
943        runner.start(|mut context| async move {
944            let num_validators = 4;
945            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
946            let epoch = Epoch::new(111);
947
948            let (mut oracle, mut registrations) =
949                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
950                    .await;
951
952            let reporters = spawn_validator_engines(
953                context.with_label("validator"),
954                &fixture,
955                &mut registrations,
956                &mut oracle,
957                epoch,
958                Duration::from_secs(5),
959                vec![],
960            );
961
962            // Partition network (remove all links)
963            for v1 in fixture.participants.iter() {
964                for v2 in fixture.participants.iter() {
965                    if v2 == v1 {
966                        continue;
967                    }
968                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
969                }
970            }
971            context.sleep(Duration::from_secs(20)).await;
972
973            // Restore network links
974            for v1 in fixture.participants.iter() {
975                for v2 in fixture.participants.iter() {
976                    if v2 == v1 {
977                        continue;
978                    }
979                    oracle
980                        .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
981                        .await
982                        .unwrap();
983                }
984            }
985
986            await_reporters(
987                context.with_label("reporter"),
988                &reporters,
989                Height::new(100),
990                epoch,
991            )
992            .await;
993        });
994    }
995
996    #[test_traced("INFO")]
997    fn test_network_partition() {
998        network_partition(bls12381_threshold::fixture::<MinPk, _>);
999        network_partition(bls12381_threshold::fixture::<MinSig, _>);
1000        network_partition(bls12381_multisig::fixture::<MinPk, _>);
1001        network_partition(bls12381_multisig::fixture::<MinSig, _>);
1002        network_partition(ed25519::fixture);
1003        network_partition(secp256r1::fixture);
1004    }
1005
1006    /// Test insufficient validator participation (below quorum).
1007    fn insufficient_validators<S, F>(fixture: F)
1008    where
1009        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1010        F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1011    {
1012        let runner = deterministic::Runner::timed(Duration::from_secs(15));
1013
1014        runner.start(|mut context| async move {
1015            let num_validators = 5;
1016            let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1017            let epoch = Epoch::new(111);
1018
1019            // Set up simulated network
1020            let (oracle, mut registrations) =
1021                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1022                    .await;
1023
1024            // Create reporters (one per online validator)
1025            let mut reporters =
1026                BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1027
1028            // Start only 2 out of 5 validators (below quorum of 3)
1029            for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1030                let context = context.with_label(&format!("participant_{participant}"));
1031
1032                // Create Provider and register scheme for epoch
1033                let provider = mocks::Provider::new();
1034                assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1035
1036                // Create monitor
1037                let monitor = mocks::Monitor::new(epoch);
1038
1039                // Create automaton with Correct strategy
1040                let automaton = mocks::Application::new(mocks::Strategy::Correct);
1041
1042                // Create reporter with verifier scheme
1043                let (reporter, reporter_mailbox) =
1044                    mocks::Reporter::new(context.clone(), fixture.verifier.clone());
1045                context.with_label("reporter").spawn(|_| reporter.run());
1046                reporters.insert(participant.clone(), reporter_mailbox.clone());
1047
1048                // Create blocker
1049                let blocker = oracle.control(participant.clone());
1050
1051                // Create and start engine
1052                let engine = Engine::new(
1053                    context.with_label("engine"),
1054                    Config {
1055                        monitor,
1056                        provider,
1057                        automaton,
1058                        reporter: reporter_mailbox,
1059                        blocker,
1060                        priority_acks: false,
1061                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1062                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1063                        window: std::num::NonZeroU64::new(10).unwrap(),
1064                        activity_timeout: HeightDelta::new(100),
1065                        journal_partition: format!("aggregation-{participant}"),
1066                        journal_write_buffer: NZUsize!(4096),
1067                        journal_replay_buffer: NZUsize!(4096),
1068                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1069                        journal_compression: Some(3),
1070                        journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1071                        strategy: Sequential,
1072                    },
1073                );
1074
1075                let (sender, receiver) = registrations.remove(participant).unwrap();
1076                engine.start((sender, receiver));
1077            }
1078
1079            // With insufficient validators, consensus should not be achievable
1080            // Wait long enough for any potential consensus attempts to complete
1081            context.sleep(Duration::from_secs(12)).await;
1082
1083            // Check that no validator achieved consensus
1084            let mut any_consensus = false;
1085            for (validator_pk, mut reporter_mailbox) in reporters {
1086                let (tip, _) = reporter_mailbox
1087                    .get_tip()
1088                    .await
1089                    .unwrap_or((Height::zero(), Epoch::zero()));
1090                if !tip.is_zero() {
1091                    any_consensus = true;
1092                    tracing::warn!(
1093                        ?validator_pk,
1094                        %tip,
1095                        "Unexpected consensus with insufficient validators"
1096                    );
1097                }
1098            }
1099
1100            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1101            assert!(
1102                !any_consensus,
1103                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1104            );
1105        });
1106    }
1107
1108    #[test_traced("INFO")]
1109    fn test_insufficient_validators() {
1110        insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1111        insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1112        insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1113        insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1114        insufficient_validators(ed25519::fixture);
1115        insufficient_validators(secp256r1::fixture);
1116    }
1117}