pub mod scheme;
pub mod types;
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
mod config;
pub use config::Config;
mod engine;
pub use engine::Engine;
mod metrics;
mod safe_tip;
#[cfg(test)]
pub mod mocks;
}
}
#[cfg(test)]
mod tests {
use super::{mocks, Config, Engine};
use crate::{
aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
types::{Epoch, EpochDelta, Height, HeightDelta},
};
use commonware_cryptography::{
bls12381::primitives::variant::{MinPk, MinSig},
certificate::mocks::Fixture,
ed25519::PublicKey,
sha256::Digest as Sha256Digest,
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
Clock, Metrics, Quota, Runner, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
test_rng, NZUsize, NonZeroDuration, NZU16,
};
use futures::future::join_all;
use rand::{rngs::StdRng, Rng};
use std::{
collections::BTreeMap,
num::{NonZeroU16, NonZeroU32, NonZeroUsize},
time::Duration,
};
use tracing::debug;
type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
const TEST_NAMESPACE: &[u8] = b"my testing namespace";
const RELIABLE_LINK: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
async fn register_participants(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
participants: &[PublicKey],
) -> Registrations<PublicKey> {
let mut registrations = BTreeMap::new();
for participant in participants.iter() {
let (sender, receiver) = oracle
.control(participant.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
registrations.insert(participant.clone(), (sender, receiver));
}
registrations
}
async fn link_participants(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
participants: &[PublicKey],
link: Link,
) {
for v1 in participants.iter() {
for v2 in participants.iter() {
if v2 == v1 {
continue;
}
oracle
.add_link(v1.clone(), v2.clone(), link.clone())
.await
.unwrap();
}
}
}
async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
context: Context,
fixture: &Fixture<S>,
link: Link,
) -> (
Oracle<PublicKey, deterministic::Context>,
Registrations<PublicKey>,
) {
let (network, mut oracle) = Network::new_with_peers(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
fixture.participants.clone(),
)
.await;
network.start();
let registrations = register_participants(&mut oracle, &fixture.participants).await;
link_participants(&mut oracle, &fixture.participants, link).await;
(oracle, registrations)
}
fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
context: Context,
fixture: &Fixture<S>,
registrations: &mut Registrations<PublicKey>,
oracle: &mut Oracle<PublicKey, deterministic::Context>,
epoch: Epoch,
rebroadcast_timeout: Duration,
incorrect: Vec<usize>,
) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
let mut reporters = BTreeMap::new();
for (idx, participant) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("participant_{participant}"));
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
let monitor = mocks::Monitor::new(epoch);
let strategy = if incorrect.contains(&idx) {
mocks::Strategy::Incorrect
} else {
mocks::Strategy::Correct
};
let automaton = mocks::Application::new(strategy);
let (reporter, reporter_mailbox) =
mocks::Reporter::new(context.clone(), fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(participant.clone(), reporter_mailbox.clone());
let blocker = oracle.control(participant.clone());
let engine = Engine::new(
context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox,
blocker,
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: HeightDelta::new(100),
journal_partition: format!("aggregation-{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
reporters
}
async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
context: Context,
reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
threshold_height: Height,
threshold_epoch: Epoch,
) {
let mut receivers = Vec::new();
for (reporter, mailbox) in reporters.iter() {
let (tx, rx) = oneshot::channel();
receivers.push(rx);
context.with_label("reporter_watcher").spawn({
let reporter = reporter.clone();
let mut mailbox = mailbox.clone();
move |context| async move {
loop {
let (height, epoch) = mailbox
.get_tip()
.await
.unwrap_or((Height::zero(), Epoch::zero()));
debug!(
%height,
epoch = %epoch,
%threshold_height,
threshold_epoch = %threshold_epoch,
?reporter,
"reporter status"
);
if height >= threshold_height && epoch >= threshold_epoch {
debug!(
?reporter,
"reporter reached threshold, signaling completion"
);
tx.send_lossy(reporter.clone());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
});
}
let results = join_all(receivers).await;
assert_eq!(results.len(), reporters.len());
for result in results {
assert!(result.is_ok(), "reporter was cancelled");
}
}
fn all_online<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
epoch,
Duration::from_secs(5),
vec![],
);
await_reporters(
context.with_label("reporter"),
&reporters,
Height::new(100),
epoch,
)
.await;
});
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_all_online() {
all_online(bls12381_threshold::fixture::<MinPk, _>);
all_online(bls12381_threshold::fixture::<MinSig, _>);
all_online(bls12381_multisig::fixture::<MinPk, _>);
all_online(bls12381_multisig::fixture::<MinSig, _>);
all_online(ed25519::fixture);
all_online(secp256r1::fixture);
}
fn byzantine_proposer<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
epoch,
Duration::from_secs(5),
vec![0],
);
await_reporters(
context.with_label("reporter"),
&reporters,
Height::new(100),
epoch,
)
.await;
});
}
#[test_traced("INFO")]
fn test_byzantine_proposer() {
byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
byzantine_proposer(ed25519::fixture);
byzantine_proposer(secp256r1::fixture);
}
fn unclean_byzantine_shutdown<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
{
let num_validators = 4;
let target_height = Height::new(200); let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
let shutdown_range_max = Duration::from_millis(1_000);
let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
let mut prev_checkpoint = None;
let mut rng = test_rng();
let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
let mut shutdown_count = 0;
while shutdown_count < max_shutdowns {
let fixture = fixture.clone();
let f = move |mut context: Context| {
async move {
let epoch = Epoch::new(111);
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
let monitor = mocks::Monitor::new(epoch);
let strategy = if idx == 0 {
mocks::Strategy::Incorrect
} else {
mocks::Strategy::Correct
};
let automaton = mocks::Application::new(strategy);
let blocker = oracle.control(participant.clone());
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
priority_acks: false,
rebroadcast_timeout,
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: HeightDelta::new(1_024), journal_partition: format!("unclean_shutdown_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
let completion =
context
.with_label("completion_watcher")
.spawn(move |context| async move {
loop {
if let Some(tip_height) =
reporter_mailbox.get_contiguous_tip().await
{
if tip_height >= target_height {
break;
}
}
context.sleep(Duration::from_millis(50)).await;
}
});
let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
select! {
_ = context.sleep(shutdown_wait) => {
debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
false },
_ = completion => {
debug!("Shared reporter completed normally");
true },
}
}
};
let (complete, checkpoint) = prev_checkpoint
.map_or_else(
|| {
debug!("Starting initial run");
deterministic::Runner::timed(Duration::from_secs(45))
},
|prev_checkpoint| {
debug!(shutdown_count, "Restarting from previous context");
deterministic::Runner::from(prev_checkpoint)
},
)
.start_and_recover(f);
if complete && shutdown_count >= min_shutdowns {
debug!("Test completed successfully");
break;
}
prev_checkpoint = Some(checkpoint);
shutdown_count += 1;
}
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_unclean_byzantine_shutdown() {
unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
unclean_byzantine_shutdown(ed25519::fixture);
unclean_byzantine_shutdown(secp256r1::fixture);
}
fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
{
let num_validators = 4;
let skip_height = Height::new(50); let window = HeightDelta::new(10);
let target_height = Height::new(100);
let mut rng = test_rng();
let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
let f = |context: Context| {
let fixture = fixture.clone();
async move {
let epoch = Epoch::new(111);
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
let monitor = mocks::Monitor::new(epoch);
let automaton = mocks::Application::new(mocks::Strategy::Skip {
height: skip_height,
});
let blocker = oracle.control(participant.clone());
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
100,
)),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(window.get()).unwrap(),
activity_timeout: HeightDelta::new(100),
journal_partition: format!("unsigned_height_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
loop {
if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
debug!(%tip_height, %skip_height, %target_height, "reporter status");
if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
return;
}
}
context.sleep(Duration::from_millis(50)).await;
}
}
};
let (_, checkpoint) =
deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
let f2 = |context: Context| {
async move {
let epoch = Epoch::new(111);
let (oracle, mut registrations) = initialize_simulation(
context.with_label("simulation"),
&fixture,
RELIABLE_LINK,
)
.await;
let (reporter, mut reporter_mailbox) =
mocks::Reporter::new(context.clone(), fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
for (idx, participant) in fixture.participants.iter().enumerate() {
let validator_context =
context.with_label(&format!("participant_{participant}"));
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
let monitor = mocks::Monitor::new(epoch);
let automaton = mocks::Application::new(mocks::Strategy::Correct);
let blocker = oracle.control(participant.clone());
let engine = Engine::new(
validator_context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox.clone(),
blocker,
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
100,
)),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: HeightDelta::new(100),
journal_partition: format!("unsigned_height_test_{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
loop {
if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
debug!(
%tip_height,
%skip_height, %target_height, "reporter status on restart"
);
if tip_height >= target_height {
break;
}
}
context.sleep(Duration::from_millis(50)).await;
}
}
};
deterministic::Runner::from(checkpoint).start(f2);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_unclean_shutdown_with_unsigned_height() {
unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
unclean_shutdown_with_unsigned_height(ed25519::fixture);
unclean_shutdown_with_unsigned_height(secp256r1::fixture);
}
fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(120)));
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
let degraded_link = Link {
latency: Duration::from_millis(200),
jitter: Duration::from_millis(150),
success_rate: 0.5,
};
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
epoch,
Duration::from_secs(2),
vec![],
);
await_reporters(
context.with_label("reporter"),
&reporters,
Height::new(100),
epoch,
)
.await;
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_slow_and_lossy_links() {
slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
slow_and_lossy_links(ed25519::fixture, 0);
slow_and_lossy_links(secp256r1::fixture, 0);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_determinism() {
for seed in 1..6 {
let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
assert_eq!(ts_pk_state_1, ts_pk_state_2);
let ts_sig_state_1 =
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
let ts_sig_state_2 =
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
assert_eq!(ts_sig_state_1, ts_sig_state_2);
let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
assert_eq!(ms_pk_state_1, ms_pk_state_2);
let ms_sig_state_1 =
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
let ms_sig_state_2 =
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
assert_eq!(ms_sig_state_1, ms_sig_state_2);
let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
assert_eq!(ed_state_1, ed_state_2);
let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
assert_eq!(secp_state_1, secp_state_2);
let states = [
("threshold-minpk", ts_pk_state_1),
("threshold-minsig", ts_sig_state_1),
("multisig-minpk", ms_pk_state_1),
("multisig-minsig", ms_sig_state_1),
("ed25519", ed_state_1),
("secp256r1", secp_state_1),
];
for pair in states.windows(2) {
assert_ne!(
pair[0].1, pair[1].1,
"state {} equals state {}",
pair[0].0, pair[1].0
);
}
}
}
fn one_offline<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let num_validators = 5;
let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
fixture.participants.truncate(4);
fixture.schemes.truncate(4);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
epoch,
Duration::from_secs(5),
vec![],
);
await_reporters(
context.with_label("reporter"),
&reporters,
Height::new(100),
epoch,
)
.await;
});
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_one_offline() {
one_offline(bls12381_threshold::fixture::<MinPk, _>);
one_offline(bls12381_threshold::fixture::<MinSig, _>);
one_offline(bls12381_multisig::fixture::<MinPk, _>);
one_offline(bls12381_multisig::fixture::<MinSig, _>);
one_offline(ed25519::fixture);
one_offline(secp256r1::fixture);
}
fn network_partition<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&mut registrations,
&mut oracle,
epoch,
Duration::from_secs(5),
vec![],
);
for v1 in fixture.participants.iter() {
for v2 in fixture.participants.iter() {
if v2 == v1 {
continue;
}
oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
}
}
context.sleep(Duration::from_secs(20)).await;
for v1 in fixture.participants.iter() {
for v2 in fixture.participants.iter() {
if v2 == v1 {
continue;
}
oracle
.add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
.await
.unwrap();
}
}
await_reporters(
context.with_label("reporter"),
&reporters,
Height::new(100),
epoch,
)
.await;
});
}
#[test_traced("INFO")]
fn test_network_partition() {
network_partition(bls12381_threshold::fixture::<MinPk, _>);
network_partition(bls12381_threshold::fixture::<MinSig, _>);
network_partition(bls12381_multisig::fixture::<MinPk, _>);
network_partition(bls12381_multisig::fixture::<MinSig, _>);
network_partition(ed25519::fixture);
network_partition(secp256r1::fixture);
}
fn insufficient_validators<S, F>(fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(15));
runner.start(|mut context| async move {
let num_validators = 5;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let epoch = Epoch::new(111);
let (oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let mut reporters =
BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
let context = context.with_label(&format!("participant_{participant}"));
let provider = mocks::Provider::new();
assert!(provider.register(epoch, fixture.schemes[idx].clone()));
let monitor = mocks::Monitor::new(epoch);
let automaton = mocks::Application::new(mocks::Strategy::Correct);
let (reporter, reporter_mailbox) =
mocks::Reporter::new(context.clone(), fixture.verifier.clone());
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(participant.clone(), reporter_mailbox.clone());
let blocker = oracle.control(participant.clone());
let engine = Engine::new(
context.with_label("engine"),
Config {
monitor,
provider,
automaton,
reporter: reporter_mailbox,
blocker,
priority_acks: false,
rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
window: std::num::NonZeroU64::new(10).unwrap(),
activity_timeout: HeightDelta::new(100),
journal_partition: format!("aggregation-{participant}"),
journal_write_buffer: NZUsize!(4096),
journal_replay_buffer: NZUsize!(4096),
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let (sender, receiver) = registrations.remove(participant).unwrap();
engine.start((sender, receiver));
}
context.sleep(Duration::from_secs(12)).await;
let mut any_consensus = false;
for (validator_pk, mut reporter_mailbox) in reporters {
let (tip, _) = reporter_mailbox
.get_tip()
.await
.unwrap_or((Height::zero(), Epoch::zero()));
if !tip.is_zero() {
any_consensus = true;
tracing::warn!(
?validator_pk,
%tip,
"Unexpected consensus with insufficient validators"
);
}
}
assert!(
!any_consensus,
"Consensus should not be achieved with insufficient validator participation (below quorum)"
);
});
}
#[test_traced("INFO")]
fn test_insufficient_validators() {
insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
insufficient_validators(ed25519::fixture);
insufficient_validators(secp256r1::fixture);
}
}