use crate::types::Round;
use commonware_cryptography::PublicKey;
pub mod elector;
pub mod scheme;
pub mod types;
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
mod actors;
pub mod config;
pub use config::{Config, ForwardingPolicy};
mod engine;
pub use engine::Engine;
mod metrics;
}
}
#[cfg(any(test, feature = "mocks"))]
pub mod mocks;
#[cfg(not(target_arch = "wasm32"))]
use crate::types::{View, ViewDelta};
#[cfg(not(target_arch = "wasm32"))]
pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
last_finalized.saturating_sub(activity_timeout)
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn interesting(
activity_timeout: ViewDelta,
last_finalized: View,
current: View,
pending: View,
allow_future: bool,
) -> bool {
if pending.is_zero() {
return false;
}
if pending < min_active(activity_timeout, last_finalized) {
return false;
}
if !allow_future && pending > current.next() {
return false;
}
true
}
pub enum Plan<P: PublicKey> {
Propose,
Forward {
round: Round,
peers: Vec<P>,
},
}
#[cfg(test)]
pub(crate) fn quorum(n: u32) -> u32 {
use commonware_utils::{Faults, N3f1};
N3f1::quorum(n)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
simplex::{
elector::{Config as Elector, Random, RoundRobin},
mocks::{
scheme as scheme_mocks,
twins::{self, Elector as TwinsElector},
wrapped,
},
scheme::{
bls12381_multisig,
bls12381_threshold::{
standard as bls12381_threshold_std,
vrf::{self as bls12381_threshold_vrf, Seedable},
},
ed25519, secp256r1, Scheme,
},
types::{
Certificate, Finalization as TFinalization, Finalize as TFinalize,
Notarization as TNotarization, Notarize as TNotarize,
Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
},
},
types::{Epoch, Round},
Monitor, Viewable,
};
use commonware_codec::{Decode, DecodeExt, Encode};
use commonware_cryptography::{
bls12381::primitives::variant::{MinPk, MinSig, Variant},
certificate::mocks::Fixture,
ed25519::{PrivateKey, PublicKey},
sha256::{Digest as Sha256Digest, Digest as D},
Hasher as _, Sha256, Signer as _,
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::{
simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin},
utils::mocks::inert_channel,
Manager as _, Recipients, Sender as _, TrackedPeers,
};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
Runner, Spawner,
};
use commonware_utils::{ordered::Set, sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
use engine::Engine;
use futures::future::join_all;
use rand::{rngs::StdRng, Rng as _, SeedableRng};
use std::{
collections::{BTreeMap, HashMap, HashSet},
num::{NonZeroU16, NonZeroU32, NonZeroUsize},
sync::Arc,
time::Duration,
};
use tracing::{debug, info, warn};
use types::Activity;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
#[test]
fn test_interesting() {
let activity_timeout = ViewDelta::new(10);
assert!(!interesting(
activity_timeout,
View::zero(),
View::zero(),
View::zero(),
false
));
assert!(!interesting(
activity_timeout,
View::zero(),
View::new(1),
View::zero(),
true
));
assert!(!interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(5), false
));
assert!(interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(10), false
));
assert!(!interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(27),
false
));
assert!(interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(27),
true
));
assert!(interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(26),
false
));
assert!(interesting(
activity_timeout,
View::new(20),
View::new(25),
View::new(22),
false
));
assert!(interesting(
activity_timeout,
View::zero(),
View::new(5),
View::new(1),
false
));
}
async fn register_validator(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
validator: PublicKey,
) -> (
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
) {
let control = oracle.control(validator.clone());
let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
let (certificate_sender, certificate_receiver) =
control.register(1, TEST_QUOTA).await.unwrap();
let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
(
(vote_sender, vote_receiver),
(certificate_sender, certificate_receiver),
(resolver_sender, resolver_receiver),
)
}
async fn register_validators(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
validators: &[PublicKey],
) -> HashMap<
PublicKey,
(
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
),
> {
let mut registrations = HashMap::new();
for validator in validators.iter() {
let registration = register_validator(oracle, validator.clone()).await;
registrations.insert(validator.clone(), registration);
}
registrations
}
async fn start_test_network_with_peers<I>(
context: deterministic::Context,
peers: I,
disconnect_on_block: bool,
) -> Oracle<PublicKey, deterministic::Context>
where
I: IntoIterator<Item = PublicKey>,
{
let (network, oracle) = Network::new_with_peers(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block,
tracked_peer_sets: NZUsize!(1),
},
peers,
)
.await;
network.start();
oracle
}
async fn start_test_network_with_split_peers<I, J>(
context: deterministic::Context,
primary: I,
secondary: J,
disconnect_on_block: bool,
) -> Oracle<PublicKey, deterministic::Context>
where
I: IntoIterator<Item = PublicKey>,
J: IntoIterator<Item = PublicKey>,
{
let (network, oracle) = Network::new_with_split_peers(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block,
tracked_peer_sets: NZUsize!(1),
},
primary,
secondary,
)
.await;
network.start();
oracle
}
enum Action {
Link(Link),
Update(Link), Unlink,
}
async fn link_validators(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
validators: &[PublicKey],
action: Action,
restrict_to: Option<fn(usize, usize, usize) -> bool>,
) {
for (i1, v1) in validators.iter().enumerate() {
for (i2, v2) in validators.iter().enumerate() {
if v2 == v1 {
continue;
}
if let Some(f) = restrict_to {
if !f(validators.len(), i1, i2) {
continue;
}
}
match action {
Action::Update(_) | Action::Unlink => {
oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
}
_ => {}
}
match action {
Action::Link(ref link) | Action::Update(ref link) => {
oracle
.add_link(v1.clone(), v2.clone(), link.clone())
.await
.unwrap();
}
_ => {}
}
}
}
}
fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
encoded
.lines()
.filter(|line| patterns.iter().all(|p| line.contains(p)))
.filter(|line| {
line.split_whitespace()
.last()
.and_then(|s| s.parse::<u64>().ok())
.is_some_and(|n| n > 0)
})
.count() as u32
}
fn all_online<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let quorum = quorum(n) as usize;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(300));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let latest_complete = required_containers.saturating_sub(activity_timeout);
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
{
let certified = reporter.certified.lock();
for view in View::range(View::new(1), latest_complete) {
if !certified.contains(&view) {
panic!("view: {view}");
}
}
}
let mut notarized = HashMap::new();
let mut finalized = HashMap::new();
{
let notarizes = reporter.notarizes.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(payloads) = notarizes.get(&view) else {
continue;
};
if payloads.len() > 1 {
panic!("view: {view}");
}
let (digest, notarizers) = payloads.iter().next().unwrap();
notarized.insert(view, *digest);
if notarizers.len() < quorum {
panic!("view: {view}");
}
}
}
{
let notarizations = reporter.notarizations.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(notarization) = notarizations.get(&view) else {
continue;
};
let Some(digest) = notarized.get(&view) else {
continue;
};
assert_eq!(¬arization.proposal.payload, digest);
}
}
{
let finalizes = reporter.finalizes.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(payloads) = finalizes.get(&view) else {
continue;
};
if payloads.len() > 1 {
panic!("view: {view}");
}
let (digest, finalizers) = payloads.iter().next().unwrap();
finalized.insert(view, *digest);
if view > latest_complete {
continue;
}
if finalizers.len() < quorum {
panic!("view: {view}");
}
let nullifies = reporter.nullifies.lock();
let Some(nullifies) = nullifies.get(&view) else {
continue;
};
for (_, finalizers) in payloads.iter() {
for finalizer in finalizers.iter() {
if nullifies.contains(finalizer) {
panic!("should not nullify and finalize at same view");
}
}
}
}
}
{
let finalizations = reporter.finalizations.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(finalization) = finalizations.get(&view) else {
continue;
};
let Some(digest) = finalized.get(&view) else {
continue;
};
assert_eq!(&finalization.proposal.payload, digest);
}
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_all_online() {
all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
all_online::<_, _, RoundRobin>(ed25519::fixture);
all_online::<_, _, RoundRobin>(secp256r1::fixture);
}
fn observer<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n_active = 5;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(300));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
verifier,
..
} = fixture(&mut context, &namespace, n_active);
let private_key_observer = PrivateKey::from_seed(n_active as u64);
let public_key_observer = private_key_observer.public_key();
let mut oracle = start_test_network_with_split_peers(
context.clone(),
participants.clone(),
[public_key_observer.clone()],
true,
)
.await;
let mut all_validators = participants.clone();
all_validators.push(public_key_observer.clone());
all_validators.sort();
let mut registrations = register_validators(&mut oracle, &all_validators).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let is_observer = *validator == public_key_observer;
let context = context.with_label(&format!("validator_{}", *validator));
let signing = if is_observer {
verifier.clone()
} else {
schemes[idx].clone()
};
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: signing.clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine.start(pending, recovered, resolver);
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_observer() {
observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
observer::<_, _, RoundRobin>(ed25519::fixture);
observer::<_, _, RoundRobin>(secp256r1::fixture);
}
fn unclean_shutdown<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
let supervised = Arc::new(Mutex::new(Vec::new()));
let mut prev_checkpoint = None;
let mut rng = test_rng();
let Fixture {
participants,
schemes,
..
} = fixture(&mut rng, &namespace, n);
let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
loop {
let rng = rng.clone();
let participants = participants.clone();
let schemes = schemes.clone();
let shutdowns = shutdowns.clone();
let supervised = supervised.clone();
let relay = relay.clone();
relay.deregister_all();
let f = |mut context: deterministic::Context| async move {
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true)
.await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(50),
jitter: Duration::from_millis(50),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = HashMap::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
reporters.insert(validator.clone(), reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for (_, reporter) in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
let wait =
context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
let result = select! {
_ = context.sleep(wait) => {
{
let mut shutdowns = shutdowns.lock();
debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
*shutdowns += 1;
}
supervised.lock().push(reporters);
false
},
_ = join_all(finalizers) => {
let supervised = supervised.lock();
for reporters in supervised.iter() {
for (_, reporter) in reporters.iter() {
reporter.assert_no_faults();
}
}
true
},
};
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
result
};
let (complete, checkpoint) = prev_checkpoint
.map_or_else(
|| deterministic::Runner::timed(Duration::from_secs(180)),
deterministic::Runner::from,
)
.start_and_recover(f);
if complete {
break;
}
prev_checkpoint = Some(checkpoint);
}
}
#[test_group("slow")]
#[test_traced]
fn test_unclean_shutdown() {
unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
}
fn backfill<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(240));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(
&mut oracle,
&participants,
Action::Link(link),
Some(|_, i, j| ![i, j].contains(&0usize)),
)
.await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
if idx_scheme == 0 {
continue;
}
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let link = Link {
latency: Duration::from_secs(3),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
link_validators(
&mut oracle,
&participants,
Action::Update(link.clone()),
Some(|_, i, j| ![i, j].contains(&0usize)),
)
.await;
context.sleep(Duration::from_secs(60)).await;
link_validators(
&mut oracle,
&participants,
Action::Unlink,
Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
)
.await;
let me = participants[0].clone();
let context = context.with_label(&format!("validator_{me}"));
link_validators(
&mut oracle,
&participants,
Action::Link(link),
Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
)
.await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(3),
success_rate: 1.0,
};
link_validators(
&mut oracle,
&participants,
Action::Update(link),
Some(|_, i, j| ![i, j].contains(&1usize)),
)
.await;
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let mut reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(me.clone());
let cfg = config::Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: me.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(&me)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
let (mut latest, mut monitor) = reporter.subscribe().await;
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_backfill() {
backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
backfill::<_, _, RoundRobin>(ed25519::fixture);
backfill::<_, _, RoundRobin>(secp256r1::fixture);
}
fn one_offline<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let quorum = quorum(n) as usize;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let max_exceptions = 10;
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(300));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(
&mut oracle,
&participants,
Action::Link(link),
Some(|_, i, j| ![i, j].contains(&0usize)),
)
.await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
if idx_scheme == 0 {
continue;
}
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let exceptions = 0;
let offline = &participants[0];
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
let mut exceptions = 0;
{
let notarizes = reporter.notarizes.lock();
for (view, payloads) in notarizes.iter() {
for (_, participants) in payloads.iter() {
if participants.contains(offline) {
panic!("view: {view}");
}
}
}
}
{
let nullifies = reporter.nullifies.lock();
for (view, participants) in nullifies.iter() {
if participants.contains(offline) {
panic!("view: {view}");
}
}
}
{
let finalizes = reporter.finalizes.lock();
for (view, payloads) in finalizes.iter() {
for (_, finalizers) in payloads.iter() {
if finalizers.contains(offline) {
panic!("view: {view}");
}
}
}
}
let mut offline_views = Vec::new();
{
let leaders = reporter.leaders.lock();
for (view, leader) in leaders.iter() {
if leader == offline {
offline_views.push(*view);
}
}
}
assert!(!offline_views.is_empty());
{
let nullifies = reporter.nullifies.lock();
for view in offline_views.iter() {
let nullifies = nullifies.get(view).map_or(0, |n| n.len());
if nullifies < quorum {
warn!("missing expected view nullifies: {}", view);
exceptions += 1;
}
}
}
{
let nullifications = reporter.nullifications.lock();
for view in offline_views.iter() {
if !nullifications.contains_key(view) {
warn!("missing expected view nullifies: {}", view);
exceptions += 1;
}
}
}
assert!(exceptions <= max_exceptions);
}
assert!(exceptions <= max_exceptions);
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
let encoded = context.encode();
let leader_label = format!("leader=\"{}\"", offline);
assert!(
count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
"expected timeout metrics for offline leader"
);
assert_eq!(
count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
n - 1,
"expected all online nodes to record _nullifications for offline leader"
);
});
}
#[test_group("slow")]
#[test_traced]
fn test_one_offline() {
one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
one_offline::<_, _, RoundRobin>(ed25519::fixture);
one_offline::<_, _, RoundRobin>(secp256r1::fixture);
}
fn slow_validator<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(300));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = if idx_scheme == 0 {
mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10_000.0, 0.0),
verify_latency: (10_000.0, 5.0),
certify_latency: (10_000.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
}
} else {
mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
}
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let slow = &participants[0];
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
{
let notarizes = reporter.notarizes.lock();
assert!(notarizes.values().all(|payloads| {
payloads
.values()
.all(|participants| !participants.contains(slow))
}));
}
{
let finalizes = reporter.finalizes.lock();
assert!(finalizes.values().all(|payloads| {
payloads
.values()
.all(|participants| !participants.contains(slow))
}));
}
{
let finalizations = reporter.finalizations.lock();
assert!(finalizations
.keys()
.any(|view| *view >= required_containers));
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_slow_validator() {
slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
slow_validator::<_, _, RoundRobin>(ed25519::fixture);
slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
}
fn all_recovery<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(2);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(1800));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_secs(3),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (_, mut monitor) = reporter.subscribe().await;
finalizers.push(
context
.with_label("finalizer")
.spawn(move |context| async move {
select! {
_timeout = context.sleep(Duration::from_secs(60)) => {},
_done = monitor.recv() => {
panic!("engine should not notarize or finalize anything");
},
}
}),
);
}
join_all(finalizers).await;
link_validators(&mut oracle, &participants, Action::Unlink, None).await;
context.sleep(Duration::from_secs(60)).await;
let mut latest = View::zero();
for reporter in reporters.iter() {
let nullifies = reporter.nullifies.lock();
let max = nullifies.keys().max().unwrap();
if *max > latest {
latest = *max;
}
}
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
{
let mut found = 0;
let notarizations = reporter.notarizations.lock();
for view in View::range(latest, latest.saturating_add(activity_timeout)) {
if notarizations.contains_key(&view) {
found += 1;
}
}
assert!(
found >= activity_timeout.get().saturating_sub(2),
"found: {found}"
);
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_all_recovery() {
all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
all_recovery::<_, _, RoundRobin>(ed25519::fixture);
all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
}
fn partition<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 10;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(900));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
fn separated(n: usize, a: usize, b: usize) -> bool {
let m = n / 2;
(a < m && b >= m) || (a >= m && b < m)
}
link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
context.sleep(Duration::from_secs(10)).await;
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (_, mut monitor) = reporter.subscribe().await;
finalizers.push(
context
.with_label("finalizer")
.spawn(move |context| async move {
select! {
_timeout = context.sleep(Duration::from_secs(60)) => {},
_done = monitor.recv() => {
panic!("engine should not notarize or finalize anything");
},
}
}),
);
}
join_all(finalizers).await;
link_validators(
&mut oracle,
&participants,
Action::Link(link),
Some(separated),
)
.await;
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_partition() {
partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
partition::<_, _, RoundRobin>(ed25519::fixture);
partition::<_, _, RoundRobin>(secp256r1::fixture);
}
fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(5_000)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let degraded_link = Link {
latency: Duration::from_millis(200),
jitter: Duration::from_millis(150),
success_rate: 0.5,
};
link_validators(
&mut oracle,
&participants,
Action::Link(degraded_link),
None,
)
.await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_slow_and_lossy_links() {
slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
for seed in 1..6 {
let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinPk, _>,
);
let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinPk, _>,
);
assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinSig, _>,
);
let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinSig, _>,
);
assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinPk, _>,
);
let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinPk, _>,
);
assert_eq!(ms_pk_state_1, ms_pk_state_2);
let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinSig, _>,
);
let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinSig, _>,
);
assert_eq!(ms_sig_state_1, ms_sig_state_2);
let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
assert_eq!(ed_state_1, ed_state_2);
let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
assert_eq!(secp_state_1, secp_state_2);
let states = [
("threshold-vrf-minpk", ts_vrf_pk_state_1),
("threshold-vrf-minsig", ts_vrf_sig_state_1),
("threshold-std-minpk", ts_std_pk_state_1),
("threshold-std-minsig", ts_std_sig_state_1),
("multisig-minpk", ms_pk_state_1),
("multisig-minsig", ms_sig_state_1),
("ed25519", ed_state_1),
("secp256r1", secp_state_1),
];
for pair in states.windows(2) {
assert_ne!(
pair[0].1, pair[1].1,
"state {} equals state {}",
pair[0].0, pair[1].0
);
}
}
}
fn conflicter<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::conflicter::Config {
scheme: schemes[idx_scheme].clone(),
};
let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
mocks::conflicter::Conflicter::new(
context.with_label("byzantine_engine"),
cfg,
);
engine.start(pending);
} else {
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let byz = &participants[0];
let mut count_conflicting = 0;
for reporter in reporters.iter() {
{
let faults = reporter.faults.lock();
assert_eq!(faults.len(), 1);
let faulter = faults.get(byz).expect("byzantine party is not faulter");
for (_, faults) in faulter.iter() {
for fault in faults.iter() {
match fault {
Activity::ConflictingNotarize(_) => {
count_conflicting += 1;
}
Activity::ConflictingFinalize(_) => {
count_conflicting += 1;
}
_ => panic!("unexpected fault: {fault:?}"),
}
}
}
}
reporter.assert_no_invalid();
}
assert!(count_conflicting > 0);
let blocked = oracle.blocked().await.unwrap();
assert!(!blocked.is_empty());
for (a, b) in blocked {
assert_ne!(&a, byz);
assert_eq!(&b, byz);
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_conflicter() {
for seed in 0..5 {
conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn invalid<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let schemes: Vec<_> = schemes
.into_iter()
.enumerate()
.map(|(idx, scheme)| {
let is_byzantine = idx == 0;
let behavior = if is_byzantine {
wrapped::Behavior::CorruptSignature
} else {
wrapped::Behavior::Honest
};
wrapped::Scheme::new(scheme, behavior)
})
.collect();
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = wrapped::Config(L::default());
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.clone().to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine.start(pending, recovered, resolver);
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut().skip(1) {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for (i, reporter) in reporters.iter().enumerate() {
reporter.assert_no_faults();
assert!(*reporter.invalid_votes.lock() > 0);
let is_byzantine = i == 0;
if is_byzantine {
assert!(*reporter.invalid_certificates.lock() > 0);
} else {
assert_eq!(*reporter.invalid_certificates.lock(), 0);
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.len() >= participants.len() - 1);
let byz = &participants[0];
for (_, b) in blocked {
assert_eq!(&b, byz);
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_invalid() {
for seed in 0..5 {
invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn received_certificates_are_reported<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(10);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), false).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(100),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
fn link_graph(_: usize, i: usize, j: usize) -> bool {
if i == 0 || j == 0 {
return i == 1 || j == 1;
}
true
}
link_validators(
&mut oracle,
&participants,
Action::Link(link),
Some(link_graph),
)
.await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.clone().to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine.start(pending, recovered, resolver);
}
let excluded_reporter = reporters[0].clone();
let mut honest_reporter = reporters[1].clone();
let (mut honest_latest, mut honest_monitor) = honest_reporter.subscribe().await;
while honest_latest < required_containers {
honest_latest = honest_monitor.recv().await.expect("event missing");
}
context.sleep(Duration::from_secs(1)).await;
let honest_notarized = {
let notarizations = honest_reporter.notarizations.lock();
View::range(View::new(1), required_containers.next())
.filter(|view| notarizations.contains_key(view))
.count()
};
let excluded_notarized = {
let notarizations = excluded_reporter.notarizations.lock();
View::range(View::new(1), required_containers.next())
.filter(|view| notarizations.contains_key(view))
.count()
};
assert!(
excluded_notarized >= honest_notarized.saturating_sub(2),
"honest_notarized: {honest_notarized}, excluded_notarized: {excluded_notarized}"
);
let honest_finalized = {
let finalizations = honest_reporter.finalizations.lock();
View::range(View::new(1), required_containers.next())
.filter(|view| finalizations.contains_key(view))
.count()
};
let excluded_finalized = {
let finalizations = excluded_reporter.finalizations.lock();
View::range(View::new(1), required_containers.next())
.filter(|view| finalizations.contains_key(view))
.count()
};
assert!(
excluded_finalized >= honest_finalized.saturating_sub(2),
"honest_finalized: {honest_finalized}, excluded_finalized: {excluded_finalized}"
);
});
}
#[test_group("slow")]
#[test_traced]
fn test_received_certificates_are_reported() {
received_certificates_are_reported::<_, _, Random>(
0,
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
received_certificates_are_reported::<_, _, Random>(
0,
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
received_certificates_are_reported::<_, _, RoundRobin>(
0,
bls12381_threshold_std::fixture::<MinPk, _>,
);
received_certificates_are_reported::<_, _, RoundRobin>(
0,
bls12381_threshold_std::fixture::<MinSig, _>,
);
received_certificates_are_reported::<_, _, RoundRobin>(
0,
bls12381_multisig::fixture::<MinPk, _>,
);
received_certificates_are_reported::<_, _, RoundRobin>(
0,
bls12381_multisig::fixture::<MinSig, _>,
);
received_certificates_are_reported::<_, _, RoundRobin>(0, ed25519::fixture);
received_certificates_are_reported::<_, _, RoundRobin>(0, secp256r1::fixture);
}
fn impersonator<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::impersonator::Config {
scheme: schemes[idx_scheme].clone(),
};
let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
mocks::impersonator::Impersonator::new(
context.with_label("byzantine_engine"),
cfg,
);
engine.start(pending);
} else {
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.clone().to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let byz = &participants[0];
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(!blocked.is_empty());
for (a, b) in blocked {
assert_ne!(&a, byz);
assert_eq!(&b, byz);
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_impersonator() {
for seed in 0..5 {
impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 7;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(60)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let mut engines = Vec::new();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::equivocator::Config {
scheme: schemes[idx_scheme].clone(),
epoch: Epoch::new(333),
relay: relay.clone(),
hasher: Sha256::default(),
elector: elector.clone(),
};
let engine = mocks::equivocator::Equivocator::new(
context.with_label("byzantine_engine"),
cfg,
);
engines.push(engine.start(pending, recovered));
} else {
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engines.push(engine.start(pending, recovered, resolver));
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut().skip(1) {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
let handle = engines.remove(idx);
handle.abort();
let _ = handle.await;
reporters.remove(idx);
info!(idx, ?validator, "aborted validator");
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut().skip(1) {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < View::new(required_containers.get() * 2) {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
info!(idx, ?validator, "restarting validator");
let context = context.with_label(&format!("validator_{}_restarted", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) =
register_validator(&mut oracle, validator.clone()).await;
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut().skip(1) {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < View::new(required_containers.get() * 3) {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let byz = &participants[0];
let blocked = oracle.blocked().await.unwrap();
for (a, b) in &blocked {
assert_ne!(a, byz);
assert_eq!(b, byz);
}
!blocked.is_empty()
})
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_threshold_vrf_min_pk() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_threshold_vrf_min_sig() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_threshold_std_min_pk() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_threshold_std_min_sig() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_multisig_min_pk() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_bls12381_multisig_min_sig() {
let detected = (0..5).any(|seed| {
equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
});
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_ed25519() {
let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
#[test_group("slow")]
#[test_traced]
fn test_equivocator_secp256r1() {
let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
assert!(
detected,
"expected at least one seed to detect equivocation"
);
}
fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::reconfigurer::Config {
scheme: schemes[idx_scheme].clone(),
};
let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
mocks::reconfigurer::Reconfigurer::new(
context.with_label("byzantine_engine"),
cfg,
);
engine.start(pending);
} else {
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let byz = &participants[0];
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(!blocked.is_empty());
for (a, b) in blocked {
assert_ne!(&a, byz);
assert_eq!(&b, byz);
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_reconfigurer() {
for seed in 0..5 {
reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn nuller<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(50);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::nuller::Config {
scheme: schemes[idx_scheme].clone(),
};
let engine: mocks::nuller::Nuller<_, _, Sha256> =
mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
engine.start(pending);
} else {
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.clone().to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
let byz = &participants[0];
let mut count_nullify_and_finalize = 0;
for reporter in reporters.iter() {
{
let faults = reporter.faults.lock();
assert_eq!(faults.len(), 1);
let faulter = faults.get(byz).expect("byzantine party is not faulter");
for (_, faults) in faulter.iter() {
for fault in faults.iter() {
match fault {
Activity::NullifyFinalize(_) => {
count_nullify_and_finalize += 1;
}
_ => panic!("unexpected fault: {fault:?}"),
}
}
}
}
reporter.assert_no_invalid();
}
assert!(count_nullify_and_finalize > 0);
let blocked = oracle.blocked().await.unwrap();
assert!(!blocked.is_empty());
for (a, b) in blocked {
assert_ne!(&a, byz);
assert_eq!(&b, byz);
}
});
}
#[test_group("slow")]
#[test_traced]
fn test_nuller() {
for seed in 0..5 {
nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn outdated<S, F, L>(seed: u64, mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 4;
let required_containers = View::new(100);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx_scheme, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
if idx_scheme == 0 {
let cfg = mocks::outdated::Config {
scheme: schemes[idx_scheme].clone(),
view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
};
let engine: mocks::outdated::Outdated<_, _, Sha256> =
mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
engine.start(pending);
} else {
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx_scheme].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.clone().to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine.start(pending, recovered, resolver);
}
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_group("slow")]
#[test_traced]
fn test_outdated() {
for seed in 0..5 {
outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
}
}
fn run_1k<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 10;
let required_containers = View::new(1_000);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new();
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(80),
jitter: Duration::from_millis(10),
success_rate: 0.98,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (100.0, 50.0),
verify_latency: (50.0, 40.0),
certify_latency: (50.0, 40.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
})
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_vrf_min_pk() {
run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_vrf_min_sig() {
run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_std_min_pk() {
run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_std_min_sig() {
run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_multisig_min_pk() {
run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_multisig_min_sig() {
run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_ed25519() {
run_1k::<_, _, RoundRobin>(ed25519::fixture);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_secp256r1() {
run_1k::<_, _, RoundRobin>(secp256r1::fixture);
}
fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 1;
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(10)));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: participants[0].clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(participants[0].clone());
let cfg = config::Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: participants[0].clone().to_string(),
mailbox_size: 64,
epoch: Epoch::new(333),
leader_timeout: Duration::from_millis(50),
certification_timeout: Duration::from_millis(100),
timeout_retry: Duration::from_millis(250),
fetch_timeout: Duration::from_millis(50),
activity_timeout: ViewDelta::new(4),
skip_timeout: ViewDelta::new(2),
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 16),
write_buffer: NZUsize!(1024 * 16),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(&participants[0])
.expect("validator should be registered");
let handle = engine.start(pending, recovered, resolver);
context.sleep(Duration::from_millis(1000)).await;
let running_before = count_running_tasks(&context, "engine");
assert!(
running_before > 0,
"at least one engine task should be running"
);
context.sleep(Duration::from_millis(1500)).await;
assert!(
count_running_tasks(&context, "engine") > 0,
"engine tasks should still be running"
);
let running_after = if graceful {
let metrics_context = context.clone();
let result = context.stop(0, Some(Duration::from_secs(5))).await;
assert!(
result.is_ok(),
"graceful shutdown should complete: {result:?}"
);
count_running_tasks(&metrics_context, "engine")
} else {
handle.abort();
let _ = handle.await;
context.sleep(Duration::from_millis(1000)).await;
count_running_tasks(&context, "engine")
};
assert_eq!(
running_after, 0,
"all engine tasks should be stopped, but {running_after} still running"
);
});
}
#[test_group("slow")]
#[test_traced]
fn test_children_shutdown_on_engine_abort() {
for seed in 0..10 {
engine_shutdown::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinPk, _>,
false,
);
engine_shutdown::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinSig, _>,
false,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinPk, _>,
false,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinSig, _>,
false,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinPk, _>,
false,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinSig, _>,
false,
);
engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
}
}
#[test_group("slow")]
#[test_traced]
fn test_graceful_shutdown() {
for seed in 0..10 {
engine_shutdown::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinPk, _>,
true,
);
engine_shutdown::<_, _, Random>(
seed,
bls12381_threshold_vrf::fixture::<MinSig, _>,
true,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinPk, _>,
true,
);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_threshold_std::fixture::<MinSig, _>,
true,
);
engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
engine_shutdown::<_, _, RoundRobin>(
seed,
bls12381_multisig::fixture::<MinSig, _>,
true,
);
engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
}
}
fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 3;
let required_containers = View::new(10);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), false).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let mock_reporter = mocks::reporter::Reporter::new(
context.with_label("mock_reporter"),
reporter_config,
);
let attributable_reporter = scheme::reporter::AttributableReporter::new(
context.with_label("rng"),
schemes[idx].clone(),
mock_reporter.clone(),
Sequential,
true, );
reporters.push(mock_reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: attributable_reporter,
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine.start(pending, recovered, resolver);
}
let mut finalizers = Vec::new();
for reporter in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < required_containers {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
for reporter in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
{
let notarizations = reporter.notarizations.lock();
let finalizations = reporter.finalizations.lock();
assert!(
!notarizations.is_empty() || !finalizations.is_empty(),
"Certificates should be reported"
);
}
let notarizes = reporter.notarizes.lock();
let last_view = notarizes.keys().max().cloned().unwrap_or_default();
for (view, payloads) in notarizes.iter() {
if *view == last_view {
continue; }
let signers: usize = payloads.values().map(|signers| signers.len()).sum();
if S::is_attributable() {
assert!(signers > 1, "view {view}: {signers}");
} else {
assert_eq!(signers, 0);
}
}
let finalizes = reporter.finalizes.lock();
for (_, payloads) in finalizes.iter() {
let signers: usize = payloads.values().map(|signers| signers.len()).sum();
if S::is_attributable() {
assert!(signers > 1);
} else {
assert_eq!(signers, 0);
}
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
});
}
#[test_traced]
fn test_attributable_reporter_filtering() {
attributable_reporter_filtering::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
attributable_reporter_filtering::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
attributable_reporter_filtering::<_, _, RoundRobin>(
bls12381_threshold_std::fixture::<MinPk, _>,
);
attributable_reporter_filtering::<_, _, RoundRobin>(
bls12381_threshold_std::fixture::<MinSig, _>,
);
attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
attributable_reporter_filtering::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
}
fn split_views_no_lockup<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
enum ParticipantType {
Group1, Group2, Ignorant, Byzantine, }
let get_type = |idx: usize| -> ParticipantType {
match idx {
0..3 => ParticipantType::Group1,
3..6 => ParticipantType::Group2,
6 => ParticipantType::Ignorant,
7..10 => ParticipantType::Byzantine,
_ => unreachable!(),
}
};
let n = 10;
let quorum = quorum(n) as usize;
assert_eq!(quorum, 7);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(300));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), false).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
let votes: Vec<_> = (0..=quorum)
.map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
.collect();
TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
.expect("finalization quorum")
};
let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
let votes: Vec<_> = (0..=quorum)
.map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
.collect();
TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
.expect("notarization quorum")
};
let build_nullification = |round: Round| -> TNullification<_> {
let votes: Vec<_> = (0..=quorum)
.map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
.collect();
TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
.expect("nullification quorum")
};
let f_view = 1;
let round_f = Round::new(Epoch::new(333), View::new(f_view));
let payload_b0 = Sha256::hash(b"B_F");
let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
let payload_b1a = Sha256::hash(b"B_G1");
let proposal_b1a = Proposal::new(
Round::new(Epoch::new(333), View::new(f_view + 1)),
View::new(f_view),
payload_b1a,
);
let payload_b1b = Sha256::hash(b"B_G2");
let proposal_b1b = Proposal::new(
Round::new(Epoch::new(333), View::new(f_view + 2)),
View::new(f_view),
payload_b1b,
);
let b0_notarization = build_notarization(&proposal_b0);
let b0_finalization = build_finalization(&proposal_b0);
let b1a_notarization = build_notarization(&proposal_b1a);
let b1b_notarization = build_notarization(&proposal_b1b);
let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
let (mut injector_sender, _inj_certificate_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
for p in participants.iter() {
oracle
.add_link(injector_pk.clone(), p.clone(), link.clone())
.await
.unwrap();
}
oracle
.manager()
.track(
1,
TrackedPeers::new(
Set::from_iter_dedup(participants.iter().cloned()),
Set::from_iter_dedup(std::slice::from_ref(&injector_pk).iter().cloned()),
),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
injector_sender
.send(Recipients::All, msg, true)
.await
.unwrap();
let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
injector_sender
.send(Recipients::All, msg, true)
.await
.unwrap();
let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
for (i, participant) in participants.iter().enumerate() {
let recipient = Recipients::One(participant.clone());
let msg = match get_type(i) {
ParticipantType::Group1 => notarization_msg.encode(),
_ => nullification_msg.encode(),
};
injector_sender.send(recipient, msg, true).await.unwrap();
}
let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
for (i, participant) in participants.iter().enumerate() {
let recipient = Recipients::One(participant.clone());
let msg = match get_type(i) {
ParticipantType::Group2 => notarization_msg.encode(),
_ => nullification_msg.encode(),
};
injector_sender.send(recipient, msg, true).await.unwrap();
}
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut honest_reporters = Vec::new();
for (idx, validator) in participants.iter().enumerate() {
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
let participant_type = get_type(idx);
if matches!(participant_type, ParticipantType::Byzantine) {
let cfg = mocks::nullify_only::Config {
scheme: schemes[idx].clone(),
};
let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
mocks::nullify_only::NullifyOnly::new(
context.with_label(&format!("byzantine_{}", *validator)),
cfg,
);
engine.start(pending);
drop(recovered);
drop(resolver);
} else {
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter = mocks::reporter::Reporter::new(
context.with_label(&format!("reporter_{}", *validator)),
reporter_config,
);
honest_reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (250.0, 50.0), verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label(&format!("application_{}", *validator)),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine =
Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
engine.start(pending, recovered, resolver);
}
}
context.sleep(Duration::from_secs(2)).await;
let view = View::new(f_view);
for reporter in honest_reporters.iter() {
let finalizations = reporter.finalizations.lock();
assert!(finalizations.contains_key(&view));
}
let view = View::new(f_view + 1);
for (i, reporter) in honest_reporters.iter().enumerate() {
let finalizations = reporter.finalizations.lock();
assert!(!finalizations.contains_key(&view));
let nullifications = reporter.nullifications.lock();
let notarizations = reporter.notarizations.lock();
match get_type(i) {
ParticipantType::Group1 => {
assert!(notarizations.contains_key(&view));
assert!(!nullifications.contains_key(&view));
}
_ => {
assert!(nullifications.contains_key(&view));
assert!(!notarizations.contains_key(&view));
}
}
}
let view = View::new(f_view + 2);
for (i, reporter) in honest_reporters.iter().enumerate() {
let finalizations = reporter.finalizations.lock();
assert!(!finalizations.contains_key(&view));
let nullifications = reporter.nullifications.lock();
let notarizations = reporter.notarizations.lock();
match get_type(i) {
ParticipantType::Group2 => {
assert!(notarizations.contains_key(&view));
assert!(!nullifications.contains_key(&view));
}
_ => {
assert!(nullifications.contains_key(&view));
assert!(!notarizations.contains_key(&view));
}
}
}
let next_view = View::new(f_view + 3);
for (i, reporter) in honest_reporters.iter().enumerate() {
let nullifies = reporter.nullifies.lock();
assert!(!nullifies.contains_key(&next_view), "reporter {i}");
}
link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
{
let target = View::new(f_view + 3);
let mut finalizers = Vec::new();
for reporter in honest_reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("resume_finalizer").spawn(
move |_| async move {
while latest < target {
latest = monitor.recv().await.expect("event missing");
}
},
));
}
join_all(finalizers).await;
}
for reporter in honest_reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
});
}
#[test_group("slow")]
#[test_traced]
fn test_split_views_no_lockup() {
split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
}
fn tle<V, L>()
where
V: Variant,
L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
{
let n = 4;
let namespace = b"consensus".to_vec();
let activity_timeout = ViewDelta::new(100);
let skip_timeout = ViewDelta::new(50);
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(5),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
let monitor_reporter = Arc::new(Mutex::new(None));
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.push(reporter.clone());
if idx == 0 {
*monitor_reporter.lock() = Some(reporter.clone());
}
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_millis(100),
certification_timeout: Duration::from_millis(200),
timeout_retry: Duration::from_millis(500),
fetch_timeout: Duration::from_millis(100),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(pending, recovered, resolver));
}
let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10";
let ciphertext = schemes[0].encrypt(&mut context, target, *message);
let reporter = monitor_reporter.lock().clone().unwrap();
loop {
context.sleep(Duration::from_millis(100)).await;
let notarizations = reporter.notarizations.lock();
let Some(notarization) = notarizations.get(&target.view()) else {
continue;
};
let seed = notarization.seed();
let decrypted = seed
.decrypt(&ciphertext)
.expect("Decryption should succeed with valid seed signature");
assert_eq!(
message,
decrypted.as_ref(),
"Decrypted message should match original message"
);
break;
}
});
}
#[test_traced]
fn test_tle() {
tle::<MinPk, Random>();
tle::<MinSig, Random>();
}
fn hailstorm<S, F, L>(
seed: u64,
shutdowns: usize,
interval: ViewDelta,
mut fixture: F,
) -> String
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = 5;
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let cfg = deterministic::Config::new().with_seed(seed);
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let mut oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let mut registrations = register_validators(&mut oracle, &participants).await;
let link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = L::default();
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = BTreeMap::new();
let mut engine_handlers = BTreeMap::new();
for (idx, validator) in participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{}", *validator));
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
reporters.insert(idx, reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (pending, recovered, resolver) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
}
let mut target = View::zero();
for i in 0..shutdowns {
target = target.saturating_add(interval);
let mut finalizers = Vec::new();
for (_, reporter) in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < target {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
target = target.saturating_add(interval);
let idx = context.gen_range(0..engine_handlers.len());
let validator = &participants[idx];
let handle = engine_handlers.remove(&idx).unwrap();
handle.abort();
let _ = handle.await;
let selected_reporter = reporters.remove(&idx).unwrap();
info!(idx, ?validator, "shutdown validator");
let mut finalizers = Vec::new();
for (_, reporter) in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < target {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
target = target.saturating_add(interval);
info!(idx, ?validator, "restarting validator");
let context =
context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
let (pending, recovered, resolver) =
register_validator(&mut oracle, validator.clone()).await;
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
reporters.insert(idx, selected_reporter.clone());
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: selected_reporter,
strategy: Sequential,
partition: validator.to_string(),
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
let mut finalizers = Vec::new();
for (_, reporter) in reporters.iter_mut() {
let (mut latest, mut monitor) = reporter.subscribe().await;
finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
while latest < target {
latest = monitor.recv().await.expect("event missing");
}
}));
}
join_all(finalizers).await;
info!(idx, ?validator, "validator recovered");
}
let latest_complete = target.saturating_sub(activity_timeout);
for (_, reporter) in reporters.iter() {
reporter.assert_no_faults();
reporter.assert_no_invalid();
let mut notarized = HashMap::new();
let mut finalized = HashMap::new();
{
let notarizes = reporter.notarizes.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(payloads) = notarizes.get(&view) else {
continue;
};
if payloads.len() > 1 {
panic!("view: {view}");
}
let (digest, _) = payloads.iter().next().unwrap();
notarized.insert(view, *digest);
}
}
{
let notarizations = reporter.notarizations.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(notarization) = notarizations.get(&view) else {
continue;
};
let Some(digest) = notarized.get(&view) else {
continue;
};
assert_eq!(¬arization.proposal.payload, digest);
}
}
{
let finalizes = reporter.finalizes.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(payloads) = finalizes.get(&view) else {
continue;
};
if payloads.len() > 1 {
panic!("view: {view}");
}
let (digest, _) = payloads.iter().next().unwrap();
finalized.insert(view, *digest);
if view > latest_complete {
continue;
}
let nullifies = reporter.nullifies.lock();
let Some(nullifies) = nullifies.get(&view) else {
continue;
};
for (_, finalizers) in payloads.iter() {
for finalizer in finalizers.iter() {
if nullifies.contains(finalizer) {
panic!("should not nullify and finalize at same view");
}
}
}
}
}
{
let finalizations = reporter.finalizations.lock();
for view in View::range(View::new(1), latest_complete) {
let Some(finalization) = finalizations.get(&view) else {
continue;
};
let Some(digest) = finalized.get(&view) else {
continue;
};
assert_eq!(&finalization.proposal.payload, digest);
}
}
}
let blocked = oracle.blocked().await.unwrap();
assert!(blocked.is_empty());
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
assert_eq!(
hailstorm::<_, _, Random>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_vrf::fixture::<MinPk, _>
),
hailstorm::<_, _, Random>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_vrf::fixture::<MinPk, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
assert_eq!(
hailstorm::<_, _, Random>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_vrf::fixture::<MinSig, _>
),
hailstorm::<_, _, Random>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_vrf::fixture::<MinSig, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_threshold_std_min_pk() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_std::fixture::<MinPk, _>
),
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_std::fixture::<MinPk, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_threshold_std_min_sig() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_std::fixture::<MinSig, _>
),
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_threshold_std::fixture::<MinSig, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_multisig_min_pk() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_multisig::fixture::<MinPk, _>
),
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_multisig::fixture::<MinPk, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_bls12381_multisig_min_sig() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_multisig::fixture::<MinSig, _>
),
hailstorm::<_, _, RoundRobin>(
0,
10,
ViewDelta::new(15),
bls12381_multisig::fixture::<MinSig, _>
),
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_ed25519() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
);
}
#[test_group("slow")]
#[test_traced]
fn test_hailstorm_secp256r1() {
assert_eq!(
hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
);
}
#[derive(Clone, Copy, Debug)]
struct TwinsCampaign {
n: u32,
rounds: usize,
mode: twins::Mode,
max_cases: usize,
trailing_finalizations: usize,
}
fn twins_campaign<S, F, L>(
rng: &mut StdRng,
campaign: TwinsCampaign,
link: Link,
mut fixture: F,
) where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: Elector<S>,
{
let n = campaign.n;
let faults = N3f1::max_faults(n) as usize;
let cases = twins::cases(
rng,
twins::Framework {
participants: n as usize,
faults,
rounds: campaign.rounds,
mode: campaign.mode,
max_cases: campaign.max_cases,
},
);
assert!(
!cases.is_empty(),
"twins campaign should generate at least one case"
);
for case in cases {
let scenario = case.scenario.clone();
let twin_indices = case.compromised.clone();
assert_eq!(
twin_indices.len(),
faults,
"unexpected twins count for n={n} (expected f={faults})",
);
let activity_timeout = ViewDelta::new(10);
let skip_timeout = ViewDelta::new(5);
let namespace = b"consensus".to_vec();
let link = link.clone();
let trailing_finalizations = campaign.trailing_finalizations;
let mut case_fixture =
|ctx: &mut deterministic::Context, ns: &[u8], n: u32| fixture(ctx, ns, n);
let cfg = deterministic::Config::new()
.with_rng(Box::new(StdRng::from_rng(&mut *rng).unwrap()));
let executor = deterministic::Runner::new(cfg);
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = case_fixture(&mut context, &namespace, n);
let participants: Arc<[_]> = participants.into();
let mut oracle = start_test_network_with_peers(
context.clone(),
participants.iter().cloned(),
false,
)
.await;
let mut registrations = register_validators(&mut oracle, &participants).await;
link_validators(&mut oracle, &participants, Action::Link(link), None).await;
let elector = TwinsElector::new(L::default(), &scenario, n as usize);
let relay = Arc::new(mocks::relay::Relay::new());
let mut reporters = Vec::new();
let mut engine_handlers = Vec::new();
let twin_index_set: HashSet<usize> = twin_indices.iter().copied().collect();
for idx in twin_indices.iter().copied() {
let validator = &participants[idx];
let (
(vote_sender, vote_receiver),
(certificate_sender, certificate_receiver),
(_resolver_sender, _resolver_receiver),
) = registrations
.remove(validator)
.expect("validator should be registered");
let make_vote_forwarder = || {
let participants = participants.clone();
let scenario = scenario.clone();
move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
let (primary, secondary) =
scenario.partitions(msg.view(), participants.as_ref());
match origin {
SplitOrigin::Primary => Some(Recipients::Some(primary)),
SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
}
}
};
let make_certificate_forwarder = || {
let codec = schemes[idx].certificate_codec_config();
let participants = participants.clone();
let scenario = scenario.clone();
move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
let msg: Certificate<S, D> =
Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
let (primary, secondary) =
scenario.partitions(msg.view(), participants.as_ref());
match origin {
SplitOrigin::Primary => Some(Recipients::Some(primary)),
SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
}
}
};
let make_vote_router = || {
let participants = participants.clone();
let scenario = scenario.clone();
move |(sender, message): &(_, IoBuf)| {
let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
scenario.route(msg.view(), sender, participants.as_ref())
}
};
let make_certificate_router = || {
let codec = schemes[idx].certificate_codec_config();
let participants = participants.clone();
let scenario = scenario.clone();
move |(sender, message): &(_, IoBuf)| {
let msg: Certificate<S, D> =
Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
scenario.route(msg.view(), sender, participants.as_ref())
}
};
let (vote_sender_primary, vote_sender_secondary) =
vote_sender.split_with(make_vote_forwarder());
let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver
.split_with(
context.with_label(&format!("pending_split_{idx}")),
make_vote_router(),
);
let (certificate_sender_primary, certificate_sender_secondary) =
certificate_sender.split_with(make_certificate_forwarder());
let (certificate_receiver_primary, certificate_receiver_secondary) =
certificate_receiver.split_with(
context.with_label(&format!("recovered_split_{idx}")),
make_certificate_router(),
);
for (twin_label, pending, recovered) in [
(
"primary",
(vote_sender_primary, vote_receiver_primary),
(certificate_sender_primary, certificate_receiver_primary),
),
(
"secondary",
(vote_sender_secondary, vote_receiver_secondary),
(certificate_sender_secondary, certificate_receiver_secondary),
),
] {
let label = format!("twin_{idx}_{twin_label}");
let context = context.with_label(&label);
let reporter_config = mocks::reporter::Config {
participants: participants.as_ref().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter = mocks::reporter::Reporter::new(
context.with_label("reporter"),
reporter_config,
);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: label,
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_millis(1_500),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
engine_handlers.push(engine.start(
pending,
recovered,
inert_channel(participants.as_ref()),
));
}
}
let honest_start = reporters.len();
for (idx, validator) in participants.iter().enumerate() {
if twin_index_set.contains(&idx) {
continue;
}
let label = format!("honest_{idx}");
let context = context.with_label(&label);
let reporter_config = mocks::reporter::Config {
participants: participants.as_ref().try_into().unwrap(),
scheme: schemes[idx].clone(),
elector: elector.clone(),
};
let reporter = mocks::reporter::Reporter::new(
context.with_label("reporter"),
reporter_config,
);
reporters.push(reporter.clone());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: validator.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let blocker = oracle.control(validator.clone());
let cfg = config::Config {
scheme: schemes[idx].clone(),
elector: elector.clone(),
blocker,
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
strategy: Sequential,
partition: label,
mailbox_size: 1024,
epoch: Epoch::new(333),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_millis(1_500),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout,
skip_timeout,
fetch_concurrent: 4,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), cfg);
let (
(pending_sender, pending_receiver),
(recovered_sender, recovered_receiver),
_,
) = registrations
.remove(validator)
.expect("validator should be registered");
engine_handlers.push(engine.start(
(pending_sender, pending_receiver),
(recovered_sender, recovered_receiver),
inert_channel(participants.as_ref()),
));
}
let prefix_end = View::new(scenario.rounds().len() as u64);
let mut finalizers = Vec::new();
for (i, reporter) in reporters.iter_mut().skip(honest_start).enumerate() {
let (_latest, mut monitor) = reporter.subscribe().await;
let required = trailing_finalizations;
let label = format!("finalizer_{i}");
finalizers.push(context.with_label(&label).spawn(move |_| async move {
let mut count = 0usize;
while count < required {
let view = monitor.recv().await.expect("event missing");
if view > prefix_end {
count += 1;
}
}
}));
}
join_all(finalizers).await;
let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
for reporter in reporters.iter().skip(honest_start) {
let finalizations = reporter.finalizations.lock();
for (view, finalization) in finalizations.iter() {
let digest = finalization.proposal.payload;
if let Some(existing) = finalized_at_view.get(view) {
assert_eq!(
existing, &digest,
"safety violation: conflicting finalizations at view {view}"
);
} else {
finalized_at_view.insert(*view, digest);
}
}
}
for reporter in reporters.iter().skip(honest_start) {
reporter.assert_no_invalid();
}
let twin_identities: HashSet<_> = twin_indices
.iter()
.map(|idx| participants[*idx].clone())
.collect();
let mut notarized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
BTreeMap::new();
let mut finalized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
BTreeMap::new();
for reporter in reporters.iter().skip(honest_start) {
let notarizes = reporter.notarizes.lock();
for (view, payloads) in notarizes.iter() {
let signers = notarized_by_honest_signer.entry(*view).or_default();
for (digest, payload_signers) in payloads.iter() {
for signer in payload_signers.iter() {
if twin_identities.contains(signer) {
continue;
}
if let Some(existing) = signers.insert(signer.clone(), *digest) {
assert_eq!(
existing, *digest,
"honest signer produced conflicting notarizes at view {view}"
);
}
}
}
}
let finalizes = reporter.finalizes.lock();
for (view, payloads) in finalizes.iter() {
let signers = finalized_by_honest_signer.entry(*view).or_default();
for (digest, payload_signers) in payloads.iter() {
for signer in payload_signers.iter() {
if twin_identities.contains(signer) {
continue;
}
if let Some(existing) = signers.insert(signer.clone(), *digest) {
assert_eq!(
existing, *digest,
"honest signer produced conflicting finalizes at view {view}"
);
}
}
}
}
}
for reporter in reporters.iter().skip(honest_start) {
let faults = reporter.faults.lock();
for (faulter, _) in faults.iter() {
assert!(
twin_identities.contains(faulter),
"fault from non-twin participant"
);
}
}
let blocked = oracle.blocked().await.unwrap();
for (_, faulter) in blocked {
assert!(
twin_identities.contains(&faulter),
"blocked peer attributed to non-twin participant"
);
}
});
}
}
const TWINS_CAMPAIGN: TwinsCampaign = TwinsCampaign {
n: 5,
rounds: 3,
mode: twins::Mode::Sampled,
max_cases: 20,
trailing_finalizations: 10,
};
const TWINS_LINK: Link = Link {
latency: Duration::from_millis(500),
jitter: Duration::from_millis(500),
success_rate: 1.0,
};
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_sampled() {
for link in [
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(10),
success_rate: 1.0,
},
TWINS_LINK,
] {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
link,
scheme_mocks::fixture,
);
}
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_sustained() {
let campaign = TwinsCampaign {
mode: twins::Mode::Sustained,
..TWINS_CAMPAIGN
};
for link in [
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(10),
success_rate: 1.0,
},
TWINS_LINK,
] {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
campaign,
link,
scheme_mocks::fixture,
);
}
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_large_sampled() {
let campaign = TwinsCampaign {
n: 10,
rounds: 5,
..TWINS_CAMPAIGN
};
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
campaign,
TWINS_LINK,
scheme_mocks::fixture,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_large_sustained() {
let campaign = TwinsCampaign {
n: 10,
rounds: 5,
mode: twins::Mode::Sustained,
..TWINS_CAMPAIGN
};
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
campaign,
TWINS_LINK,
scheme_mocks::fixture,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_multisig_min_pk() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_multisig::fixture::<MinPk, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_multisig_min_sig() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_multisig::fixture::<MinSig, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_threshold_vrf_min_pk() {
twins_campaign::<_, _, Random>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_threshold_vrf_min_sig() {
twins_campaign::<_, _, Random>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_threshold_std_min_pk() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_threshold_std::fixture::<MinPk, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_threshold_std_min_sig() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
bls12381_threshold_std::fixture::<MinSig, _>,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_ed25519() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
ed25519::fixture,
);
}
#[test_group("slow")]
#[test_traced("INFO")]
fn test_twins_secp256r1() {
twins_campaign::<_, _, RoundRobin>(
&mut test_rng(),
TWINS_CAMPAIGN,
TWINS_LINK,
secp256r1::fixture,
);
}
}