mod actor;
mod ingress;
mod round;
mod slot;
mod state;
use crate::{
simplex::{elector::Config as Elector, types::Activity, Plan},
types::{Epoch, ViewDelta},
CertifiableAutomaton, Relay, Reporter,
};
pub use actor::Actor;
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_p2p::Blocker;
use commonware_runtime::buffer::paged::CacheRef;
pub use ingress::Mailbox;
#[cfg(test)]
pub use ingress::Message;
use std::{num::NonZeroUsize, time::Duration};
pub struct Config<
S: Scheme,
L: Elector<S>,
B: Blocker,
D: Digest,
A: CertifiableAutomaton,
R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
F: Reporter<Activity = Activity<S, D>>,
> {
pub scheme: S,
pub elector: L,
pub blocker: B,
pub automaton: A,
pub relay: R,
pub reporter: F,
pub partition: String,
pub epoch: Epoch,
pub mailbox_size: usize,
pub leader_timeout: Duration,
pub certification_timeout: Duration,
pub timeout_retry: Duration,
pub activity_timeout: ViewDelta,
pub replay_buffer: NonZeroUsize,
pub write_buffer: NonZeroUsize,
pub page_cache: CacheRef,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
simplex::{
actors::{
batcher,
resolver::{self, MailboxMessage},
},
elector::{Config as ElectorConfig, Elector, Random, RoundRobin, RoundRobinElector},
metrics::TimeoutReason,
mocks, quorum,
scheme::{
bls12381_multisig, bls12381_threshold::vrf as bls12381_threshold_vrf, ed25519,
secp256r1, Scheme,
},
types::{
Certificate, Finalization, Finalize, Notarization, Notarize, Nullification,
Nullify, Proposal, Vote,
},
},
types::{Participant, Round, View},
Viewable,
};
use commonware_codec::{DecodeExt, Encode};
use commonware_cryptography::{
bls12381::primitives::variant::{MinPk, MinSig},
certificate::mocks::Fixture,
ed25519::PublicKey,
sha256::Digest as Sha256Digest,
Hasher as _, Sha256,
};
use commonware_macros::{select, test_collect_traces, test_traced};
use commonware_p2p::simulated::{Config as NConfig, Link, Network, Oracle};
use commonware_parallel::Sequential;
use commonware_runtime::{
deterministic, telemetry::traces::collector::TraceStorage, Clock, Metrics, Quota, Runner,
};
use commonware_utils::{channel::mpsc, sync::Mutex, NZUsize, NZU16};
use futures::FutureExt;
use std::{
num::{NonZeroU16, NonZeroU32},
sync::Arc,
time::Duration,
};
use tracing::Level;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
async fn start_test_network_with_peers<I>(
context: deterministic::Context,
peers: I,
disconnect_on_block: bool,
) -> Oracle<PublicKey, deterministic::Context>
where
I: IntoIterator<Item = PublicKey>,
{
let (network, oracle) = Network::new_with_peers(
context.with_label("network"),
NConfig {
max_size: 1024 * 1024,
disconnect_on_block,
tracked_peer_sets: NZUsize!(1),
},
peers,
)
.await;
network.start();
oracle
}
fn build_notarization<S: Scheme<Sha256Digest>>(
schemes: &[S],
proposal: &Proposal<Sha256Digest>,
count: u32,
) -> (
Vec<Notarize<S, Sha256Digest>>,
Notarization<S, Sha256Digest>,
) {
let votes: Vec<_> = schemes
.iter()
.take(count as usize)
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
let certificate = Notarization::from_notarizes(&schemes[0], &votes, &Sequential)
.expect("notarization requires a quorum of votes");
(votes, certificate)
}
fn build_finalization<S: Scheme<Sha256Digest>>(
schemes: &[S],
proposal: &Proposal<Sha256Digest>,
count: u32,
) -> (
Vec<Finalize<S, Sha256Digest>>,
Finalization<S, Sha256Digest>,
) {
let votes: Vec<_> = schemes
.iter()
.take(count as usize)
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
let certificate = Finalization::from_finalizes(&schemes[0], &votes, &Sequential)
.expect("finalization requires a quorum of votes");
(votes, certificate)
}
fn build_nullification<S: Scheme<Sha256Digest>>(
schemes: &[S],
round: Round,
count: u32,
) -> (Vec<Nullify<S>>, Nullification<S>) {
let votes: Vec<_> = schemes
.iter()
.take(count as usize)
.map(|scheme| Nullify::sign::<Sha256Digest>(scheme, round).unwrap())
.collect();
let certificate = Nullification::from_nullifies(&schemes[0], &votes, &Sequential)
.expect("nullification requires a quorum of votes");
(votes, certificate)
}
#[allow(clippy::too_many_arguments)]
async fn setup_voter<S, L>(
context: &mut deterministic::Context,
oracle: &commonware_p2p::simulated::Oracle<S::PublicKey, deterministic::Context>,
participants: &[S::PublicKey],
schemes: &[S],
elector: L,
leader_timeout: Duration,
certification_timeout: Duration,
timeout_retry: Duration,
should_certify: mocks::application::Certifier<Sha256Digest>,
) -> (
Mailbox<S, Sha256Digest>,
mpsc::Receiver<batcher::Message<S, Sha256Digest>>,
mpsc::Receiver<resolver::MailboxMessage<S, Sha256Digest>>,
Arc<mocks::relay::Relay<Sha256Digest, S::PublicKey>>,
mocks::reporter::Reporter<deterministic::Context, S, L, Sha256Digest>,
)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
L: ElectorConfig<S>,
{
let signing = schemes[0].clone();
let me = participants[0].clone();
let reporter_cfg = mocks::reporter::Config {
participants: participants.to_vec().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter = mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify,
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_test_{me}"),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout,
certification_timeout,
timeout_retry,
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, resolver_receiver) = mpsc::channel(8);
let (batcher_sender, batcher_receiver) = mpsc::channel(16);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
certificate_sender,
);
(
mailbox,
batcher_receiver,
resolver_receiver,
relay,
reporter,
)
}
async fn advance_to_view<S: Scheme<Sha256Digest>>(
mailbox: &mut Mailbox<S, Sha256Digest>,
batcher_receiver: &mut mpsc::Receiver<batcher::Message<S, Sha256Digest>>,
schemes: &[S],
quorum: u32,
target: View,
) -> Sha256Digest {
let prev_view = target.previous().expect("target view must be > 0");
let payload = Sha256::hash(prev_view.get().to_be_bytes().as_slice());
let proposal = Proposal::new(
Round::new(Epoch::new(333), prev_view),
prev_view.previous().unwrap_or(View::zero()),
payload,
);
let (_, finalization) = build_finalization(schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current < target {
continue;
}
assert_eq!(current, target);
break;
}
batcher::Message::Constructed(_) => {}
}
}
payload
}
fn stale_backfill<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), false).await;
let me = participants[0].clone();
let elector = L::default();
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
certify_latency: (10.0, 5.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 10,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NonZeroUsize::new(1024 * 1024).unwrap(),
write_buffer: NonZeroUsize::new(1024 * 1024).unwrap(),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (actor, mut mailbox) = Actor::new(context.clone(), cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(10);
let resolver = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(1024);
let batcher = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
actor.start(batcher, resolver, vote_sender, certificate_sender);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let payload = Sha256::hash(b"test");
let proposal = Proposal::new(
Round::new(Epoch::new(333), View::new(100)),
View::new(50),
payload,
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(101));
assert_eq!(finalized, View::new(100));
response.send(None).unwrap();
break;
}
_ => {
continue;
}
}
}
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization.view(), View::new(100));
}
_ => panic!("unexpected resolver message"),
}
let payload = Sha256::hash(b"test2");
let proposal = Proposal::new(
Round::new(Epoch::new(333), View::new(50)),
View::new(49),
payload,
);
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
let payload = Sha256::hash(b"test3");
let proposal = Proposal::new(
Round::new(Epoch::new(333), View::new(300)),
View::new(100),
payload,
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(301));
assert_eq!(finalized, View::new(300));
response.send(None).unwrap();
break;
}
_ => {
continue;
}
}
}
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization.view(), View::new(300));
}
_ => panic!("unexpected resolver message"),
}
});
}
#[test_traced]
fn test_stale_backfill() {
stale_backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
stale_backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
stale_backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
stale_backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
stale_backfill::<_, _, RoundRobin>(ed25519::fixture);
stale_backfill::<_, _, RoundRobin>(secp256r1::fixture);
}
fn append_old_interesting_view<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"test_prune_panic".to_vec();
let activity_timeout = ViewDelta::new(10);
let executor = deterministic::Runner::timed(Duration::from_secs(20));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let signing = schemes[0].clone();
let me = participants[0].clone();
let elector = L::default();
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let relay = Arc::new(mocks::relay::Relay::new());
let app_config = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), app_config);
actor.start();
let voter_config = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_actor_test_{me}"),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_millis(1000),
timeout_retry: Duration::from_millis(1000),
activity_timeout,
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (actor, mut mailbox) = Actor::new(context.clone(), voter_config);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(10);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(10);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
actor.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let lf_target = View::new(50);
let journal_floor_target = lf_target
.saturating_sub(activity_timeout)
.saturating_add(ViewDelta::new(5));
let proposal_lf = Proposal::new(
Round::new(Epoch::new(333), lf_target),
lf_target.previous().unwrap(),
Sha256::hash(b"test"),
);
let (_, finalization) = build_finalization(&schemes, &proposal_lf, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(51));
assert_eq!(finalized, View::new(50));
response.send(None).unwrap();
break;
}
_ => {
continue;
}
}
}
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization.view(), View::new(50));
}
_ => panic!("unexpected resolver message"),
}
let proposal_jft = Proposal::new(
Round::new(Epoch::new(333), journal_floor_target),
journal_floor_target.previous().unwrap(),
Sha256::hash(b"test2"),
);
let (_, notarization_for_floor) = build_notarization(&schemes, &proposal_jft, quorum);
mailbox
.recovered(Certificate::Notarization(notarization_for_floor))
.await;
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Notarization(notarization)) => {
assert_eq!(notarization.view(), journal_floor_target);
}
_ => panic!("unexpected resolver message"),
}
let problematic_view = journal_floor_target.saturating_sub(ViewDelta::new(3));
let proposal_bft = Proposal::new(
Round::new(Epoch::new(333), problematic_view),
problematic_view.previous().unwrap(),
Sha256::hash(b"test3"),
);
let (_, notarization_for_bft) = build_notarization(&schemes, &proposal_bft, quorum);
mailbox
.recovered(Certificate::Notarization(notarization_for_bft))
.await;
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Notarization(notarization)) => {
assert_eq!(notarization.view(), problematic_view);
}
_ => panic!("unexpected resolver message"),
}
let proposal_lf = Proposal::new(
Round::new(Epoch::new(333), View::new(100)),
View::new(99),
Sha256::hash(b"test4"),
);
let (_, finalization) = build_finalization(&schemes, &proposal_lf, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(101));
assert_eq!(finalized, View::new(100));
response.send(None).unwrap();
break;
}
_ => {
continue;
}
}
}
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization.view(), View::new(100));
}
_ => panic!("unexpected resolver message"),
}
});
}
#[test_traced]
fn test_append_old_interesting_view() {
append_old_interesting_view::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
append_old_interesting_view::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
append_old_interesting_view::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
append_old_interesting_view::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
append_old_interesting_view::<_, _, RoundRobin>(ed25519::fixture);
append_old_interesting_view::<_, _, RoundRobin>(secp256r1::fixture);
}
fn finalization_without_notarization_certificate<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"finalization_without_notarization".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, reporter) =
setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_secs(1000),
Duration::from_secs(1000),
mocks::application::Certifier::Sometimes,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"finalize_without_notarization"),
);
let (_, expected_finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Finalization(expected_finalization.clone()))
.await;
let mut finalized_view = None;
while let Some(message) = resolver_receiver.recv().await {
match message {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
finalized_view = Some(finalization.view());
break;
}
_ => continue,
}
}
assert_eq!(finalized_view, Some(view));
loop {
{
let finalizations = reporter.finalizations.lock();
if matches!(
finalizations.get(&view),
Some(finalization) if finalization == &expected_finalization
) {
break;
}
}
context.sleep(Duration::from_millis(10)).await;
}
let notarizations = reporter.notarizations.lock();
assert!(notarizations.is_empty());
});
}
#[test_traced]
fn test_finalization_without_notarization_certificate() {
finalization_without_notarization_certificate::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
finalization_without_notarization_certificate::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
finalization_without_notarization_certificate::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
finalization_without_notarization_certificate::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
finalization_without_notarization_certificate::<_, _, RoundRobin>(ed25519::fixture);
finalization_without_notarization_certificate::<_, _, RoundRobin>(secp256r1::fixture);
}
fn certificate_conflicts_proposal<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"certificate_conflicts_proposal_test".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, reporter) =
setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_secs(1000),
Duration::from_secs(1000),
mocks::application::Certifier::Sometimes,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal_a = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"proposal_a"),
);
mailbox.proposal(proposal_a.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let proposal_b = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"proposal_b"),
);
let (_, notarization_b) = build_notarization(&schemes, &proposal_b, quorum);
mailbox
.recovered(Certificate::Notarization(notarization_b.clone()))
.await;
let msg = resolver_receiver
.recv()
.await
.expect("failed to receive resolver message");
match msg {
MailboxMessage::Certificate(Certificate::Notarization(notarization)) => {
assert_eq!(notarization.proposal, proposal_b);
assert_eq!(notarization, notarization_b);
}
_ => panic!("unexpected resolver message"),
}
loop {
{
let notarizations = reporter.notarizations.lock();
if matches!(
notarizations.get(&view),
Some(notarization) if notarization == ¬arization_b
) {
break;
}
}
context.sleep(Duration::from_millis(10)).await;
}
context.sleep(Duration::from_millis(100)).await;
loop {
let Some(Some(message)) = batcher_receiver.recv().now_or_never() else {
break;
};
match message {
batcher::Message::Constructed(Vote::Finalize(_)) => {
panic!("finalize vote should not be broadcast");
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => continue,
}
}
});
}
#[test_traced]
fn test_certificate_conflicts_proposal() {
certificate_conflicts_proposal::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
certificate_conflicts_proposal::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certificate_conflicts_proposal::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
certificate_conflicts_proposal::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
certificate_conflicts_proposal::<_, _, RoundRobin>(ed25519::fixture);
certificate_conflicts_proposal::<_, _, RoundRobin>(secp256r1::fixture);
}
fn proposal_conflicts_certificate<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"proposal_conflicts_certificate_test".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, reporter) =
setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_secs(1000),
Duration::from_secs(1000),
mocks::application::Certifier::Sometimes,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal_a = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"proposal_a"),
);
let proposal_b = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"proposal_b"),
);
let (_, notarization_a) = build_notarization(&schemes, &proposal_a, quorum);
mailbox
.recovered(Certificate::Notarization(notarization_a.clone()))
.await;
let msg = resolver_receiver.recv().await.unwrap();
match msg {
MailboxMessage::Certificate(Certificate::Notarization(notarization)) => {
assert_eq!(notarization.proposal, proposal_a);
}
_ => panic!("unexpected resolver message"),
}
loop {
{
let notarizations = reporter.notarizations.lock();
if matches!(
notarizations.get(&view),
Some(notarization) if notarization == ¬arization_a
) {
break;
}
}
context.sleep(Duration::from_millis(10)).await;
}
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Constructed(Vote::Finalize(finalize)) => {
assert_eq!(
finalize.proposal, proposal_a,
"finalize should be for certificate's proposal A"
);
break;
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => context.sleep(Duration::from_millis(10)).await,
}
}
mailbox.proposal(proposal_b.clone()).await;
context.sleep(Duration::from_millis(100)).await;
});
}
#[test_traced]
fn test_proposal_conflicts_certificate() {
proposal_conflicts_certificate::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
proposal_conflicts_certificate::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
proposal_conflicts_certificate::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
proposal_conflicts_certificate::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
proposal_conflicts_certificate::<_, _, RoundRobin>(ed25519::fixture);
proposal_conflicts_certificate::<_, _, RoundRobin>(secp256r1::fixture);
}
fn certificate_verifies_proposal<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"certificate_conflicts_proposal_test".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: participants[0].clone(),
propose_latency: (1.0, 0.0),
verify_latency: (100_000.0, 0.0), certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(participants[0].clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "voter_certificate_verifies_proposal_test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let me = participants[0].clone();
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"same_proposal"),
);
mailbox.proposal(proposal.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization.clone()))
.await;
let msg = resolver_receiver.recv().await.unwrap();
match msg {
MailboxMessage::Certificate(Certificate::Notarization(n)) => {
assert_eq!(n.proposal, proposal);
}
_ => panic!("unexpected resolver message"),
}
loop {
{
let notarizations = reporter.notarizations.lock();
if matches!(
notarizations.get(&view),
Some(n) if n == ¬arization
) {
break;
}
}
context.sleep(Duration::from_millis(10)).await;
}
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Constructed(Vote::Finalize(finalize)) => {
assert_eq!(finalize.proposal, proposal);
break;
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => context.sleep(Duration::from_millis(10)).await,
}
}
});
}
#[test_traced]
fn test_certificate_verifies_proposal() {
certificate_verifies_proposal::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
certificate_verifies_proposal::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
certificate_verifies_proposal::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
certificate_verifies_proposal::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
certificate_verifies_proposal::<_, _, RoundRobin>(ed25519::fixture);
certificate_verifies_proposal::<_, _, RoundRobin>(secp256r1::fixture);
}
fn drop_our_proposal_on_conflict<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"drop_our_proposal_on_conflict_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
verifier: _,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let view2_round = Round::new(epoch, View::new(2));
let elector_config = RoundRobin::<Sha256>::default();
let temp_elector: RoundRobinElector<S> =
elector_config.clone().build(schemes[0].participants());
let leader_idx = temp_elector.elect(view2_round, None);
let leader = participants[usize::from(leader_idx)].clone();
let leader_scheme = schemes[usize::from(leader_idx)].clone();
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: leader.clone(),
propose_latency: (50.0, 10.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
actor.start();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: leader_scheme.clone(),
elector: elector_config.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let voter_cfg = Config {
scheme: leader_scheme.clone(),
elector: elector_config,
blocker: oracle.control(leader.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "voter_leader".to_string(),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _) = oracle
.control(leader.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(leader.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view1_round = Round::new(epoch, View::new(1));
let view1_proposal =
Proposal::new(view1_round, View::new(0), Sha256::hash(b"view1_payload"));
let (_, finalization) = build_finalization(&schemes, &view1_proposal, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(2));
assert_eq!(finalized, View::new(1));
response.send(None).unwrap();
break;
}
_ => {
continue;
}
}
}
context.sleep(Duration::from_millis(5)).await;
let conflicting_proposal =
Proposal::new(view2_round, View::new(1), Sha256::hash(b"leader_proposal"));
mailbox.proposal(conflicting_proposal.clone()).await;
while let Ok(message) = batcher_receiver.try_recv() {
match message {
batcher::Message::Constructed(Vote::Notarize(notarize)) => {
assert!(notarize.proposal == conflicting_proposal);
}
_ => panic!("unexpected batcher message"),
}
}
context.sleep(Duration::from_millis(100)).await;
let (_, conflicting_notarization) =
build_notarization(&schemes, &conflicting_proposal, quorum);
mailbox
.recovered(Certificate::Notarization(conflicting_notarization.clone()))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Constructed(Vote::Finalize(f)) => {
assert_eq!(f.proposal, conflicting_proposal);
break;
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => context.sleep(Duration::from_millis(10)).await,
}
}
});
}
#[test_traced]
fn test_drop_our_proposal_on_conflict() {
drop_our_proposal_on_conflict(bls12381_threshold_vrf::fixture::<MinPk, _>);
drop_our_proposal_on_conflict(bls12381_threshold_vrf::fixture::<MinSig, _>);
drop_our_proposal_on_conflict(bls12381_multisig::fixture::<MinPk, _>);
drop_our_proposal_on_conflict(bls12381_multisig::fixture::<MinSig, _>);
drop_our_proposal_on_conflict(ed25519::fixture);
drop_our_proposal_on_conflict(secp256r1::fixture);
}
fn populate_resolver_on_restart<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"populate_resolver_on_restart_test".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: participants[0].clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Sometimes,
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(participants[0].clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "voter_populate_resolver_on_restart_test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter"), voter_cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let me = participants[0].clone();
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"finalize_without_notarization"),
);
let (_, expected_finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Finalization(expected_finalization.clone()))
.await;
let finalization = resolver_receiver.recv().await.unwrap();
match finalization {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization, expected_finalization);
}
_ => panic!("unexpected resolver message"),
}
handle.abort();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(participants[0].clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "voter_populate_resolver_on_restart_test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, _mailbox) = Actor::new(context.with_label("voter_restarted"), voter_cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let me = participants[0].clone();
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(3));
assert_eq!(finalized, View::new(2));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let finalization = resolver_receiver.recv().await.unwrap();
match finalization {
MailboxMessage::Certificate(Certificate::Finalization(finalization)) => {
assert_eq!(finalization, expected_finalization);
}
_ => panic!("unexpected resolver message"),
}
});
}
#[test_traced]
fn test_populate_resolver_on_restart() {
populate_resolver_on_restart::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
populate_resolver_on_restart::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
populate_resolver_on_restart::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
populate_resolver_on_restart::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
populate_resolver_on_restart::<_, _, RoundRobin>(ed25519::fixture);
populate_resolver_on_restart::<_, _, RoundRobin>(secp256r1::fixture);
}
fn startup_update_timeout_hint_nullifies_recovered_view<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"startup_update_timeout_hint_nullify".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(20));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let me = participants[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor.start();
let partition = "voter_startup_update_timeout_hint_nullify".to_string();
let epoch = Epoch::new(333);
let make_cfg = |page_cache: CacheRef| Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: partition.clone(),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache,
};
let cfg = make_cfg(CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE));
let (voter, mut mailbox) = Actor::new(context.with_label("voter_initial"), cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(32);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
certificate_sender,
);
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => panic!("expected initial update"),
}
let target_view = View::new(3);
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
handle.abort();
let cfg = make_cfg(CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE));
let (voter, _mailbox) = Actor::new(context.with_label("voter_restarted"), cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(32);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
certificate_sender,
);
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
finalized,
response,
..
} => {
assert_eq!(current, target_view);
assert_eq!(finalized, target_view.previous().unwrap());
response.send(Some(TimeoutReason::LeaderNullify)).unwrap();
}
_ => panic!("expected startup update after restart"),
}
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected immediate nullify for recovered view {target_view} from startup timeout hint"
);
},
}
}
});
}
#[test_traced]
fn test_startup_update_timeout_hint_nullifies_recovered_view() {
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(ed25519::fixture);
startup_update_timeout_hint_nullifies_recovered_view::<_, _>(secp256r1::fixture);
}
fn finalization_from_resolver<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"finalization_from_resolver".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let (mut mailbox, mut batcher_receiver, _, _, reporter) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_secs(1000),
Duration::from_secs(1000),
mocks::application::Certifier::Sometimes,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"finalization_from_resolver"),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Finalization(finalization.clone()))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update { finalized, .. } if finalized == view => break,
_ => continue,
}
}
let finalizations = reporter.finalizations.lock();
let recorded = finalizations
.get(&view)
.expect("finalization should be recorded");
assert_eq!(recorded, &finalization);
});
}
#[test_traced]
fn test_finalization_from_resolver() {
finalization_from_resolver::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
finalization_from_resolver::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
finalization_from_resolver::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
finalization_from_resolver::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
finalization_from_resolver::<_, _, RoundRobin>(ed25519::fixture);
finalization_from_resolver::<_, _, RoundRobin>(secp256r1::fixture);
}
fn no_resolver_boomerang<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"no_resolver_boomerang".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, reporter) =
setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_secs(1000),
Duration::from_secs(1000),
mocks::application::Certifier::Sometimes,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
leader: _,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let view = View::new(2);
let proposal = Proposal::new(
Round::new(Epoch::new(333), view),
view.previous().unwrap(),
Sha256::hash(b"no_resolver_boomerang"),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization.clone()))
.await;
loop {
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
finalized,
response,
..
} if finalized == view => {
response.send(None).unwrap();
break;
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => continue,
}
}
let finalizations = reporter.finalizations.lock();
let recorded = finalizations
.get(&view)
.expect("finalization should be recorded");
assert_eq!(recorded, &finalization);
drop(finalizations);
assert!(
resolver_receiver.recv().now_or_never().is_none(),
"resolver should not receive the certificate back"
);
});
}
#[test_traced]
fn test_no_resolver_boomerang() {
no_resolver_boomerang::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
no_resolver_boomerang::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
no_resolver_boomerang::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
no_resolver_boomerang::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
no_resolver_boomerang::<_, _, RoundRobin>(ed25519::fixture);
no_resolver_boomerang::<_, _, RoundRobin>(secp256r1::fixture);
}
fn verification_failure_emits_nullify_immediately<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"consensus".to_vec();
let activity_timeout = ViewDelta::new(10);
let executor = deterministic::Runner::timed(Duration::from_secs(5));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let signing = schemes[0].clone();
let me = participants[0].clone();
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (10.0, 0.0), certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
actor.set_fail_verification(true);
actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_verify_fail_test_{me}"),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_mins(60),
activity_timeout,
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(2);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(16);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => panic!("expected Update message"),
}
let mut current_view = View::new(1);
let mut prev_proposal = Proposal::new(
Round::new(Epoch::new(333), current_view),
View::zero(),
Sha256::hash(b"v0"),
);
let (target_view, leader) = loop {
let (_, finalization) = build_finalization(&schemes, &prev_proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
let (new_view, leader) = loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
if current > current_view {
break (current, leader);
}
}
batcher::Message::Constructed(_) => {}
}
};
current_view = new_view;
if leader != Participant::new(0) {
break (current_view, participants[usize::from(leader)].clone());
}
prev_proposal = Proposal::new(
Round::new(Epoch::new(333), current_view),
current_view.previous().unwrap(),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
};
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"test_proposal"),
);
let parent_payload = Sha256::hash(
target_view
.previous()
.unwrap()
.get()
.to_be_bytes()
.as_slice(),
);
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected nullify for view {} within 1s (timeouts are 10s)",
target_view
);
},
}
}
});
}
#[test_traced]
fn test_verification_failure_emits_nullify_immediately() {
verification_failure_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
verification_failure_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
verification_failure_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
verification_failure_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
verification_failure_emits_nullify_immediately::<_, _, RoundRobin>(ed25519::fixture);
verification_failure_emits_nullify_immediately::<_, _, RoundRobin>(secp256r1::fixture);
}
fn leader_nullify_fast_paths_timeout<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"leader_nullify_fast_paths_timeout".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(5));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let me_idx = Participant::new(0);
let signing = schemes[0].clone();
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_leader_nullify_fast_path_{me}"),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(32);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let (mut current_view, mut current_leader) =
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
(current, leader)
}
_ => panic!("expected initial update"),
};
while current_leader == me_idx {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} if current > current_view => {
response.send(None).unwrap();
current_view = current;
current_leader = leader;
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
}
let target_view = current_view;
mailbox
.timeout(target_view, TimeoutReason::LeaderNullify)
.await;
loop {
select! {
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Constructed(_) => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected nullify for view {} within 1s (timeouts are 10s)",
target_view
);
},
}
}
mailbox
.timeout(target_view, TimeoutReason::LeaderNullify)
.await;
let duplicate_window = context.current() + Duration::from_millis(300);
loop {
select! {
_ = context.sleep_until(duplicate_window) => break,
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
panic!("duplicate leader nullify should not retrigger fast-path");
}
batcher::Message::Constructed(_) => {}
},
}
}
});
}
#[test_traced]
fn test_leader_nullify_fast_paths_timeout() {
leader_nullify_fast_paths_timeout::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
leader_nullify_fast_paths_timeout::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
leader_nullify_fast_paths_timeout::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
leader_nullify_fast_paths_timeout::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
leader_nullify_fast_paths_timeout::<_, _, RoundRobin>(ed25519::fixture);
leader_nullify_fast_paths_timeout::<_, _, RoundRobin>(secp256r1::fixture);
}
fn dropped_propose_emits_nullify_immediately<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"dropped_propose_emits_nullify_immediately".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let me_idx = Participant::new(0);
let signing = schemes[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor.set_drop_proposals(true);
app_actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_drop_propose_test_{me}"),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(32);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let (mut current_view, mut current_leader) =
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
(current, leader)
}
_ => panic!("expected initial update"),
};
while current_leader != me_idx {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} if current > current_view => {
response.send(None).unwrap();
current_view = current;
current_leader = leader;
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
}
let target_view = current_view;
loop {
select! {
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Constructed(_) => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected nullify for view {} within 1s (timeouts are 10s)",
target_view
);
},
}
}
});
}
#[test_traced]
fn test_dropped_propose_emits_nullify_immediately() {
dropped_propose_emits_nullify_immediately(bls12381_threshold_vrf::fixture::<MinPk, _>);
dropped_propose_emits_nullify_immediately(bls12381_threshold_vrf::fixture::<MinSig, _>);
dropped_propose_emits_nullify_immediately(bls12381_multisig::fixture::<MinPk, _>);
dropped_propose_emits_nullify_immediately(bls12381_multisig::fixture::<MinSig, _>);
dropped_propose_emits_nullify_immediately(ed25519::fixture);
dropped_propose_emits_nullify_immediately(secp256r1::fixture);
}
fn dropped_verify_emits_nullify_immediately<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"dropped_verify_emits_nullify_immediately".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let signing = schemes[0].clone();
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
app_actor.set_drop_verifications(true);
app_actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_drop_verify_test_{me}"),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(32);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => panic!("expected initial update"),
}
let mut current_view = View::new(1);
let (target_view, leader) = loop {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
let (new_view, leader) = loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
if current > current_view {
break (current, leader);
}
}
batcher::Message::Constructed(_) => {}
}
};
current_view = new_view;
if leader != Participant::new(0) {
break (current_view, participants[usize::from(leader)].clone());
}
};
let proposal = Proposal::new(
Round::new(epoch, target_view),
target_view.previous().unwrap(),
Sha256::hash(b"drop_verify"),
);
let contents = (
proposal.round,
Sha256::hash(
target_view
.previous()
.unwrap()
.get()
.to_be_bytes()
.as_slice(),
),
7u64,
)
.encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal).await;
loop {
select! {
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Constructed(_) => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected nullify for view {} within 1s (timeouts are 10s)",
target_view
);
},
}
}
let encoded = context.encode();
assert!(
encoded.lines().any(|line| {
line.contains("_timeouts")
&& line.contains("reason=\"IgnoredProposal\"")
&& !line.ends_with(" 0")
}),
"expected non-zero timeout metric with reason=IgnoredProposal"
);
});
}
#[test_traced]
fn test_dropped_verify_emits_nullify_immediately() {
dropped_verify_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
dropped_verify_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
dropped_verify_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
dropped_verify_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
dropped_verify_emits_nullify_immediately::<_, _, RoundRobin>(ed25519::fixture);
dropped_verify_emits_nullify_immediately::<_, _, RoundRobin>(secp256r1::fixture);
}
fn invalid_ancestry_emits_nullify_immediately<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S> + Default,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"invalid_ancestry_emits_nullify_immediately".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let (mut mailbox, mut batcher_receiver, _, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
L::default(),
Duration::from_secs(10),
Duration::from_secs(10),
Duration::from_mins(60),
mocks::application::Certifier::Sometimes,
)
.await;
let me = Participant::new(0);
let (mut current_view, mut current_leader) =
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
(current, leader)
}
_ => panic!("expected initial update"),
};
while current_view == View::new(1) || current_leader == me {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} if current > current_view => {
response.send(None).unwrap();
current_view = current;
current_leader = leader;
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
}
let target_view = current_view;
let invalid_parent = target_view
.previous()
.expect("target view must have a finalized predecessor")
.previous()
.unwrap_or(View::zero());
let proposal = Proposal::new(
Round::new(epoch, target_view),
invalid_parent,
Sha256::hash(b"invalid_parent_before_finalized"),
);
mailbox.proposal(proposal).await;
loop {
select! {
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Constructed(_) => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!(
"expected nullify for view {} within 1s (timeouts are 10s)",
target_view
);
},
}
}
let encoded = context.encode();
assert!(
encoded.lines().any(|line| {
line.contains("_timeouts")
&& line.contains("reason=\"InvalidProposal\"")
&& !line.ends_with(" 0")
}),
"expected non-zero timeout metric with reason=InvalidProposal"
);
});
}
#[test_traced]
fn test_invalid_ancestry_emits_nullify_immediately() {
invalid_ancestry_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
invalid_ancestry_emits_nullify_immediately::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
invalid_ancestry_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
invalid_ancestry_emits_nullify_immediately::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
invalid_ancestry_emits_nullify_immediately::<_, _, RoundRobin>(ed25519::fixture);
invalid_ancestry_emits_nullify_immediately::<_, _, RoundRobin>(secp256r1::fixture);
}
fn dropped_verify_still_votes_after_prior_participation<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"dropped_verify_still_votes_after_prior_participation".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let me_idx = Participant::new(0);
let signing = schemes[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: signing.clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) =
mocks::application::Application::new(context.with_label("app"), application_cfg);
app_actor.set_drop_verifications(true);
app_actor.start();
let voter_cfg = Config {
scheme: signing.clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: format!("voter_dropped_verify_after_participation_{me}"),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_millis(250),
certification_timeout: Duration::from_millis(250),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(10240),
write_buffer: NZUsize!(10240),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.clone(), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let resolver_mailbox = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(64);
let batcher_mailbox = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let observer = participants[1].clone();
let (_, mut observer_vote_receiver) = oracle
.control(observer.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(
me.clone(),
observer,
Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
},
)
.await
.unwrap();
voter.start(
batcher_mailbox,
resolver_mailbox,
vote_sender,
certificate_sender,
);
let (mut current_view, mut current_leader) =
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
(current, leader)
}
_ => panic!("expected initial update"),
};
while current_leader != me_idx {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} if current > current_view => {
response.send(None).unwrap();
current_view = current;
current_leader = leader;
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
}
let leader_view = current_view;
let ready_deadline = context.current() + Duration::from_secs(1);
let mut became_ready = false;
loop {
select! {
_ = context.sleep_until(ready_deadline) => break,
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
},
message = commonware_p2p::Receiver::recv(&mut observer_vote_receiver) => {
let (_, message) = message.unwrap();
let vote: Vote<S, Sha256Digest> = Vote::decode(message).unwrap();
if vote.view() == leader_view {
became_ready = true;
break;
}
},
}
}
assert!(
became_ready,
"expected a network vote in leader view {leader_view}"
);
let (target_view, target_leader) = loop {
let proposal = Proposal::new(
Round::new(epoch, current_view),
current_view.previous().unwrap_or(View::zero()),
Sha256::hash(current_view.get().to_be_bytes().as_slice()),
);
let (_, finalization) = build_finalization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Finalization(finalization))
.await;
let mut found = None;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} if current > current_view => {
response.send(None).unwrap();
current_view = current;
if leader != me_idx {
found = Some((current, participants[usize::from(leader)].clone()));
}
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
if let Some(target) = found {
break target;
}
};
let proposal = Proposal::new(
Round::new(epoch, target_view),
target_view.previous().unwrap(),
Sha256::hash(b"drop_verify_after_ready"),
);
let contents = (
proposal.round,
Sha256::hash(
target_view
.previous()
.unwrap()
.get()
.to_be_bytes()
.as_slice(),
),
11u64,
)
.encode();
relay.broadcast(&target_leader, (proposal.payload, contents));
mailbox.proposal(proposal).await;
let target_deadline = context.current() + Duration::from_secs(1);
let mut saw_target_network_vote = false;
loop {
select! {
_ = context.sleep_until(target_deadline) => break,
message = batcher_receiver.recv() => match message.unwrap() {
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
},
message = commonware_p2p::Receiver::recv(&mut observer_vote_receiver) => {
let (_, message) = message.unwrap();
let vote: Vote<S, Sha256Digest> = Vote::decode(message).unwrap();
if vote.view() == target_view {
saw_target_network_vote = true;
break;
}
},
}
}
assert!(
saw_target_network_vote,
"expected a network vote for target view {target_view} after dropped verification"
);
});
}
#[test_traced]
fn test_dropped_verify_still_votes_after_prior_participation() {
dropped_verify_still_votes_after_prior_participation(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
dropped_verify_still_votes_after_prior_participation(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
dropped_verify_still_votes_after_prior_participation(
bls12381_multisig::fixture::<MinPk, _>,
);
dropped_verify_still_votes_after_prior_participation(
bls12381_multisig::fixture::<MinSig, _>,
);
dropped_verify_still_votes_after_prior_participation(ed25519::fixture);
dropped_verify_still_votes_after_prior_participation(secp256r1::fixture);
}
fn no_recertification_after_replay<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"no_recertification_after_replay".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let certify_calls: Arc<Mutex<Vec<Sha256Digest>>> = Arc::new(Mutex::new(Vec::new()));
let tracker = certify_calls.clone();
let elector = L::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let me = participants[0].clone();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Custom(Box::new(move |d| {
tracker.lock().push(d);
true
})),
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "no_recertification_after_replay".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
let view2 = View::new(2);
let proposal2 = Proposal::new(
Round::new(Epoch::new(333), view2),
View::new(1),
Sha256::hash(b"finalized_payload"),
);
let (_, finalization) = build_finalization(&schemes, &proposal2, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
if let batcher::Message::Update {
finalized,
response,
..
} = batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
if finalized >= view2 {
break;
}
}
}
assert_eq!(
certify_calls.lock().len(),
0,
"certify should not be called for finalization"
);
let view3 = View::new(3);
let digest3 = Sha256::hash(b"payload_for_certification");
let proposal3 = Proposal::new(Round::new(Epoch::new(333), view3), view2, digest3);
let contents = (proposal3.round, proposal2.payload, 0u64).encode();
relay.broadcast(&me, (digest3, contents));
mailbox.proposal(proposal3.clone()).await;
let (_, notarization) = build_notarization(&schemes, &proposal3, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
loop {
if let batcher::Message::Update {
current, response, ..
} = batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
if current > view3 {
break;
}
}
}
assert_eq!(
certify_calls.lock().len(),
1,
"certify should be called once for notarization"
);
handle.abort();
let tracker = certify_calls.clone();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Custom(Box::new(move |d| {
tracker.lock().push(d);
true
})),
};
let (actor, application) =
mocks::application::Application::new(context.with_label("app2"), app_cfg);
actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "no_recertification_after_replay".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1000),
timeout_retry: Duration::from_secs(1000),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, _mailbox) = Actor::new(context.with_label("voter_restarted"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
context.sleep(Duration::from_millis(100)).await;
assert_eq!(
certify_calls.lock().len(),
1,
"certify should not be called again after replay"
);
});
}
#[test_traced]
fn test_no_recertification_after_replay() {
no_recertification_after_replay::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
no_recertification_after_replay::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
no_recertification_after_replay::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
no_recertification_after_replay::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
no_recertification_after_replay::<_, _, RoundRobin>(ed25519::fixture);
no_recertification_after_replay::<_, _, RoundRobin>(secp256r1::fixture);
}
fn no_self_verify_when_proposing<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"no_self_verify_when_proposing".to_vec();
let partition = "no_self_verify_when_proposing".to_string();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let target_view = View::new(2);
let me = participants[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let propose_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let verify_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let propose_tracker = propose_calls.clone();
let verify_tracker = verify_calls.clone();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor
.set_propose_observer(Box::new(move |ctx| propose_tracker.lock().push(ctx.view())));
app_actor.set_verify_observer(Box::new(move |ctx, _| {
verify_tracker.lock().push(ctx.view())
}));
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter,
partition,
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1),
timeout_retry: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
break;
}
batcher::Message::Constructed(_) => {}
}
}
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
let proposed = propose_calls.lock();
let verified = verify_calls.lock();
assert!(
proposed.contains(&target_view),
"test precondition: voter must have called automaton.propose for the leader-owned view (observed: {proposed:?})"
);
assert!(
!verified.contains(&target_view),
"voter must not verify its own leader-built proposal (observed: {verified:?})"
);
});
}
#[test_traced]
fn test_no_self_verify_when_proposing() {
no_self_verify_when_proposing(bls12381_threshold_vrf::fixture::<MinPk, _>);
no_self_verify_when_proposing(bls12381_threshold_vrf::fixture::<MinSig, _>);
no_self_verify_when_proposing(bls12381_multisig::fixture::<MinPk, _>);
no_self_verify_when_proposing(bls12381_multisig::fixture::<MinSig, _>);
no_self_verify_when_proposing(ed25519::fixture);
no_self_verify_when_proposing(secp256r1::fixture);
}
fn no_self_propose_or_verify_after_restart<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"no_self_propose_or_verify_after_restart".to_vec();
let partition = "no_self_propose_or_verify_after_restart".to_string();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let target_view = View::new(2);
let me = participants[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: partition.clone(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1),
timeout_retry: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
break;
}
batcher::Message::Constructed(_) => {}
}
}
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Constructed(Vote::Notarize(notarize))
if notarize.view() == target_view =>
{
break notarize.proposal;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
};
handle.abort();
let propose_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let verify_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let propose_tracker = propose_calls.clone();
let verify_tracker = verify_calls.clone();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) = mocks::application::Application::new(
context.with_label("app_restarted"),
app_cfg,
);
app_actor
.set_propose_observer(Box::new(move |ctx| propose_tracker.lock().push(ctx.view())));
app_actor.set_verify_observer(Box::new(move |ctx, _| {
verify_tracker.lock().push(ctx.view())
}));
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter,
partition,
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1),
timeout_retry: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) =
Actor::new(context.with_label("voter_restarted"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
assert_eq!(current, target_view);
assert_eq!(leader, Participant::new(0));
break;
}
batcher::Message::Constructed(_) => {}
}
}
mailbox.proposal(proposal.clone()).await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
let proposed = propose_calls.lock();
let verified = verify_calls.lock();
assert!(
!proposed.contains(&target_view),
"voter must not propose for a leader-owned view after restart (observed: {proposed:?})"
);
assert!(
!verified.contains(&target_view),
"voter must not verify its own leader-built proposal after restart (observed: {verified:?})"
);
});
}
#[test_traced]
fn test_no_self_propose_or_verify_after_restart() {
no_self_propose_or_verify_after_restart(bls12381_threshold_vrf::fixture::<MinPk, _>);
no_self_propose_or_verify_after_restart(bls12381_threshold_vrf::fixture::<MinSig, _>);
no_self_propose_or_verify_after_restart(bls12381_multisig::fixture::<MinPk, _>);
no_self_propose_or_verify_after_restart(bls12381_multisig::fixture::<MinSig, _>);
no_self_propose_or_verify_after_restart(ed25519::fixture);
no_self_propose_or_verify_after_restart(secp256r1::fixture);
}
fn no_self_verify_after_restart<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"no_self_verify_after_restart".to_vec();
let partition = "no_self_verify_after_restart".to_string();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let target_view = View::new(3);
let target_leader_idx = 1usize;
let me = participants[0].clone();
let leader_pk = participants[target_leader_idx].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app"), app_cfg);
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: partition.clone(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1),
timeout_retry: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
break;
}
batcher::Message::Constructed(_) => {}
}
}
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"follower_proposal"),
);
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader_pk, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Constructed(Vote::Notarize(notarize))
if notarize.view() == target_view =>
{
assert_eq!(notarize.proposal, proposal);
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
handle.abort();
let propose_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let verify_calls: Arc<Mutex<Vec<View>>> = Arc::new(Mutex::new(Vec::new()));
let propose_tracker = propose_calls.clone();
let verify_tracker = verify_calls.clone();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Always,
};
let (mut app_actor, application) = mocks::application::Application::new(
context.with_label("app_restarted"),
app_cfg,
);
app_actor
.set_propose_observer(Box::new(move |ctx| propose_tracker.lock().push(ctx.view())));
app_actor.set_verify_observer(Box::new(move |ctx, _| {
verify_tracker.lock().push(ctx.view())
}));
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter,
partition,
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_millis(500),
certification_timeout: Duration::from_secs(1),
timeout_retry: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) =
Actor::new(context.with_label("voter_restarted"), voter_cfg);
let (resolver_sender, _) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current,
leader,
response,
..
} => {
response.send(None).unwrap();
assert_eq!(current, target_view);
assert_eq!(leader, Participant::from_usize(target_leader_idx));
break;
}
batcher::Message::Constructed(_) => {}
}
}
mailbox.proposal(proposal.clone()).await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
batcher::Message::Constructed(_) => {}
}
}
let proposed = propose_calls.lock();
let verified = verify_calls.lock();
assert!(
!verified.contains(&target_view),
"voter must not request verification for a previously-voted view after restart (observed: {verified:?})"
);
assert!(
!proposed.contains(&target_view),
"voter must not propose for a previously-voted view after restart (observed: {proposed:?})"
);
});
}
#[test_traced]
fn test_no_self_verify_after_restart() {
no_self_verify_after_restart(bls12381_threshold_vrf::fixture::<MinPk, _>);
no_self_verify_after_restart(bls12381_threshold_vrf::fixture::<MinSig, _>);
no_self_verify_after_restart(bls12381_multisig::fixture::<MinPk, _>);
no_self_verify_after_restart(bls12381_multisig::fixture::<MinSig, _>);
no_self_verify_after_restart(ed25519::fixture);
no_self_verify_after_restart(secp256r1::fixture);
}
fn certification_cancelled_on_finalization<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let elector = L::default();
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (2_000.0, 0.0), should_certify: mocks::application::Certifier::Always,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "cert_cancel_test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (actor, mut mailbox) = Actor::new(context.clone(), cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(10);
let resolver = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(1024);
let batcher = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
actor.start(batcher, resolver, vote_sender, certificate_sender);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
let view5 = View::new(5);
let digest5 = Sha256::hash(b"payload_to_certify");
let proposal5 =
Proposal::new(Round::new(Epoch::new(333), view5), View::new(0), digest5);
let contents = (proposal5.round, Sha256::hash(b"genesis"), 42u64).encode();
relay.broadcast(&me, (digest5, contents));
mailbox.proposal(proposal5.clone()).await;
let (_, notarization) = build_notarization(&schemes, &proposal5, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
context.sleep(Duration::from_millis(100)).await;
let (_, finalization) = build_finalization(&schemes, &proposal5, quorum);
mailbox
.recovered(Certificate::Finalization(finalization))
.await;
loop {
if let batcher::Message::Update {
finalized,
response,
..
} = batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
if finalized >= view5 {
break;
}
}
}
loop {
let msg = resolver_receiver
.recv()
.await
.expect("expected resolver msg");
match msg {
MailboxMessage::Certificate(Certificate::Finalization(f)) => {
assert_eq!(f.view(), view5);
break;
}
MailboxMessage::Certificate(_) => continue,
MailboxMessage::Certified { .. } => {
panic!("unexpected Certified message before finalization processed")
}
}
}
let certified_received = select! {
msg = resolver_receiver.recv() => {
matches!(msg, Some(MailboxMessage::Certified { .. }))
},
_ = context.sleep(Duration::from_secs(4)) => false,
};
assert!(
!certified_received,
"Certified message should NOT have been sent - certification should be cancelled"
);
});
}
#[test_traced]
fn test_certification_cancelled_on_finalization() {
certification_cancelled_on_finalization::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
certification_cancelled_on_finalization::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certification_cancelled_on_finalization::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
certification_cancelled_on_finalization::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
certification_cancelled_on_finalization::<_, _, RoundRobin>(ed25519::fixture);
certification_cancelled_on_finalization::<_, _, RoundRobin>(secp256r1::fixture);
}
fn certification_still_reports_to_resolver_after_nullification<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let me = participants[0].clone();
let elector = L::default();
let reporter_config = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (2_000.0, 0.0), should_certify: mocks::application::Certifier::Always,
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: "cert_after_nullification_test".to_string(),
epoch: Epoch::new(333),
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (actor, mut mailbox) = Actor::new(context.clone(), cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(10);
let resolver = resolver::Mailbox::new(resolver_sender);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(1024);
let batcher = batcher::Mailbox::new(batcher_sender);
let (vote_sender, _vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (certificate_sender, _certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
actor.start(batcher, resolver, vote_sender, certificate_sender);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
let view5 = View::new(5);
let digest5 = Sha256::hash(b"payload_to_certify");
let proposal5 =
Proposal::new(Round::new(Epoch::new(333), view5), View::new(0), digest5);
let contents = (proposal5.round, Sha256::hash(b"genesis"), 42u64).encode();
relay.broadcast(&me, (digest5, contents));
mailbox.proposal(proposal5.clone()).await;
let (_, notarization) = build_notarization(&schemes, &proposal5, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
context.sleep(Duration::from_millis(100)).await;
let (_, nullification) =
build_nullification(&schemes, Round::new(Epoch::new(333), view5), quorum);
mailbox
.recovered(Certificate::Nullification(nullification))
.await;
let reported = loop {
select! {
msg = resolver_receiver.recv() => match msg.unwrap() {
MailboxMessage::Certified { view, success } if view == view5 =>
break Some(success),
MailboxMessage::Certified { .. } | MailboxMessage::Certificate(_) => {}
},
msg = batcher_receiver.recv() => {
if let batcher::Message::Update { response, .. } = msg.unwrap() {
response.send(None).unwrap();
}
},
_ = context.sleep(Duration::from_secs(6)) => {
break None;
},
}
};
assert_eq!(
reported,
Some(true),
"expected resolver to receive successful certification after nullification"
);
});
}
#[test_traced]
fn test_certification_still_reports_to_resolver_after_nullification() {
certification_still_reports_to_resolver_after_nullification::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
certification_still_reports_to_resolver_after_nullification::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certification_still_reports_to_resolver_after_nullification::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
certification_still_reports_to_resolver_after_nullification::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
certification_still_reports_to_resolver_after_nullification::<_, _, RoundRobin>(
ed25519::fixture,
);
certification_still_reports_to_resolver_after_nullification::<_, _, RoundRobin>(
secp256r1::fixture,
);
}
fn late_notarization_after_nullification_still_certifies<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"late_notarization_after_nullification".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
RoundRobin::<Sha256>::default(),
Duration::from_secs(5),
Duration::from_secs(5),
Duration::from_secs(5),
mocks::application::Certifier::Always,
)
.await;
let target_view = View::new(3);
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let (_, nullification) =
build_nullification(&schemes, Round::new(Epoch::new(333), target_view), quorum);
mailbox
.resolved(Certificate::Nullification(nullification))
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"late_notarization_after_nullification"),
);
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
let certified = loop {
select! {
msg = resolver_receiver.recv() => match msg.unwrap() {
MailboxMessage::Certified { view, success } if view == target_view => {
break Some(success);
}
MailboxMessage::Certified { .. } | MailboxMessage::Certificate(_) => {}
},
msg = batcher_receiver.recv() => {
if let batcher::Message::Update { response, .. } = msg.unwrap() {
response.send(None).unwrap();
}
},
_ = context.sleep(Duration::from_secs(6)) => break None,
}
};
assert_eq!(
certified,
Some(true),
"expected notarization after nullification to still trigger certification"
);
});
}
#[test_traced]
fn test_late_notarization_after_nullification_still_certifies() {
late_notarization_after_nullification_still_certifies::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
late_notarization_after_nullification_still_certifies::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
late_notarization_after_nullification_still_certifies::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
late_notarization_after_nullification_still_certifies::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
late_notarization_after_nullification_still_certifies::<_, _>(ed25519::fixture);
late_notarization_after_nullification_still_certifies::<_, _>(secp256r1::fixture);
}
fn certification_after_timeout<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"certification_after_timeout".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(60));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = RoundRobin::<Sha256>::default();
let built_elector: RoundRobinElector<S> = elector
.clone()
.build(&participants.clone().try_into().unwrap());
let (mut mailbox, mut batcher_receiver, _, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(10),
Duration::from_secs(10),
Duration::from_secs(100),
mocks::application::Certifier::Always,
)
.await;
let target_view = View::new(3);
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
assert_ne!(
built_elector.elect(Round::new(Epoch::new(333), target_view), None),
Participant::new(0),
"we should not be leader at view 3"
);
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(n))
if n.view() == target_view =>
break,
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(15)) => {
panic!("expected nullify vote");
},
}
}
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"timeout_test"),
);
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
let advanced = loop {
select! {
msg = batcher_receiver.recv() => {
if let batcher::Message::Update {
current, response, ..
} = msg.unwrap()
{
response.send(None).unwrap();
if current > target_view {
break true;
}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
break false;
},
}
};
assert!(
advanced,
"view should advance after certification (timeout case)"
);
});
}
#[test_traced]
fn test_certification_after_timeout() {
certification_after_timeout::<_, _>(bls12381_threshold_vrf::fixture::<MinPk, _>);
certification_after_timeout::<_, _>(bls12381_threshold_vrf::fixture::<MinSig, _>);
certification_after_timeout::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
certification_after_timeout::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
certification_after_timeout::<_, _>(ed25519::fixture);
certification_after_timeout::<_, _>(secp256r1::fixture);
}
fn certification_after_notarize_timeout_as_follower<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"certification_after_notarize_timeout_as_follower".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(60));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = RoundRobin::<Sha256>::default();
let built_elector: RoundRobinElector<S> = elector
.clone()
.build(&participants.clone().try_into().unwrap());
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(10),
Duration::from_secs(10),
Duration::from_secs(100),
mocks::application::Certifier::Always,
)
.await;
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
assert_ne!(
built_elector.elect(Round::new(Epoch::new(333), target_view), None),
Participant::new(0),
"we should not be leader at view 3"
);
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"follower_test"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(n))
if n.view() == target_view =>
break,
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("expected notarize vote");
},
}
}
context.sleep(Duration::from_secs(11)).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(n))
if n.view() == target_view =>
break,
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!("expected nullify vote");
},
}
}
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
let advanced = loop {
select! {
msg = batcher_receiver.recv() => {
if let batcher::Message::Update {
current, response, ..
} = msg.unwrap()
{
response.send(None).unwrap();
if current > target_view {
break true;
}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
break false;
},
}
};
assert!(
advanced,
"view should advance after certification (follower case)"
);
});
}
#[test_traced]
fn test_certification_after_notarize_timeout_as_follower() {
certification_after_notarize_timeout_as_follower::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
certification_after_notarize_timeout_as_follower::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certification_after_notarize_timeout_as_follower::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
certification_after_notarize_timeout_as_follower::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
certification_after_notarize_timeout_as_follower::<_, _>(ed25519::fixture);
certification_after_notarize_timeout_as_follower::<_, _>(secp256r1::fixture);
}
fn certification_after_notarize_timeout_as_leader<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"certification_after_notarize_timeout_as_leader".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(60));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = RoundRobin::<Sha256>::default();
let built_elector: RoundRobinElector<S> = elector
.clone()
.build(&participants.clone().try_into().unwrap());
let (mut mailbox, mut batcher_receiver, _, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(10),
Duration::from_secs(10),
Duration::from_secs(100),
mocks::application::Certifier::Always,
)
.await;
let target_view = View::new(2);
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
assert_eq!(
built_elector.elect(Round::new(Epoch::new(333), target_view), None),
Participant::new(0),
"we should be leader at view 2"
);
let proposal = loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(n))
if n.view() == target_view =>
{
break n.proposal.clone();
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("expected notarize vote as leader");
},
}
};
context.sleep(Duration::from_secs(11)).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(n))
if n.view() == target_view =>
break,
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(1)) => {
panic!("expected nullify vote");
},
}
}
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
let advanced = loop {
select! {
msg = batcher_receiver.recv() => {
if let batcher::Message::Update {
current, response, ..
} = msg.unwrap()
{
response.send(None).unwrap();
if current > target_view {
break true;
}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
break false;
},
}
};
assert!(
advanced,
"view should advance after certification (leader case)"
);
});
}
#[test_traced]
fn test_certification_after_notarize_timeout_as_leader() {
certification_after_notarize_timeout_as_leader::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
certification_after_notarize_timeout_as_leader::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certification_after_notarize_timeout_as_leader::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
certification_after_notarize_timeout_as_leader::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
certification_after_notarize_timeout_as_leader::<_, _>(ed25519::fixture);
certification_after_notarize_timeout_as_leader::<_, _>(secp256r1::fixture);
}
fn cancelled_certification_does_not_hang<S, F>(mut fixture: F, traces: TraceStorage)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"consensus".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_millis(500),
Duration::from_millis(500),
Duration::from_mins(60),
mocks::application::Certifier::Cancel,
)
.await;
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"test_proposal"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay
.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
loop {
select! {
msg = batcher_receiver.recv() => {
match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify)) if nullify.view() == target_view => {
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!(
"voter should emit nullify for view {target_view} despite cancelled certification",
);
},
}
}
let expected_round = format!("Round {{ epoch: Epoch(333), view: View({target_view}) }}");
traces
.get_by_level(Level::DEBUG)
.expect_event(|event| {
event.metadata.content == "failed to certify proposal"
&& event
.metadata
.fields
.iter()
.any(|(name, value)| name == "err" && value == "RecvError(())")
&& event
.metadata
.fields
.iter()
.any(|(name, value)| name == "round" && value == &expected_round)
})
.unwrap();
});
}
#[test_collect_traces]
fn test_cancelled_certification_does_not_hang(traces: TraceStorage) {
cancelled_certification_does_not_hang(
bls12381_threshold_vrf::fixture::<MinPk, _>,
traces.clone(),
);
cancelled_certification_does_not_hang(
bls12381_threshold_vrf::fixture::<MinSig, _>,
traces.clone(),
);
cancelled_certification_does_not_hang(
bls12381_multisig::fixture::<MinPk, _>,
traces.clone(),
);
cancelled_certification_does_not_hang(
bls12381_multisig::fixture::<MinSig, _>,
traces.clone(),
);
cancelled_certification_does_not_hang(ed25519::fixture, traces.clone());
cancelled_certification_does_not_hang(secp256r1::fixture, traces);
}
fn cancelled_certification_recertifies_after_restart<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"cancelled_cert_restart_recertify".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(20));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let me = participants[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());
let partition = "cancelled_certification_recertifies_after_restart".to_string();
let epoch = Epoch::new(333);
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Cancel,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app_cancel"), app_cfg);
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: partition.clone(),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter_cancel"), voter_cfg);
let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(epoch, target_view),
target_view.previous().unwrap(),
Sha256::hash(b"restart_recertify_payload"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
context.sleep(Duration::from_millis(200)).await;
let advanced_before_restart = select! {
msg = batcher_receiver.recv() => {
if let batcher::Message::Update {
current, response, ..
} = msg.unwrap()
{
response.send(None).unwrap();
current > target_view
} else {
false
}
},
_ = context.sleep(Duration::from_millis(200)) => false,
};
assert!(
!advanced_before_restart,
"view should not advance before restart when certification receiver is canceled"
);
handle.abort();
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (2_000.0, 0.0), should_certify: mocks::application::Certifier::Always,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app_restarted"), app_cfg);
app_actor.start();
let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition,
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, _mailbox) = Actor::new(context.with_label("voter_restarted"), voter_cfg);
let (resolver_sender, mut resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();
voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);
if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}
loop {
select! {
msg = resolver_receiver.recv() => match msg.unwrap() {
MailboxMessage::Certified { view, success } if view == target_view => {
assert!(success, "expected successful certification after restart for canceled certification view");
break;
}
MailboxMessage::Certified { .. } | MailboxMessage::Certificate(_) => {}
},
msg = batcher_receiver.recv() => {
match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
panic!("unexpected immediate nullify for view {target_view} after restart");
}
batcher::Message::Update { response, .. } => {
response.send(None).unwrap();
}
_ => {}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!(
"timed out waiting for successful certification for restarted view {target_view}"
);
},
}
};
context.sleep(Duration::from_millis(100)).await;
assert!(
!reporter.nullifies.lock().contains_key(&target_view),
"did not expect nullify votes for restarted view {target_view}"
);
assert!(
!reporter.nullifications.lock().contains_key(&target_view),
"did not expect nullification certificate for restarted view {target_view}"
);
});
}
#[test_traced]
fn test_cancelled_certification_recertifies_after_restart() {
cancelled_certification_recertifies_after_restart::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
cancelled_certification_recertifies_after_restart::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
cancelled_certification_recertifies_after_restart::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
cancelled_certification_recertifies_after_restart::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
cancelled_certification_recertifies_after_restart::<_, _>(ed25519::fixture);
cancelled_certification_recertifies_after_restart::<_, _>(secp256r1::fixture);
}
fn only_finalization_rescues_validator<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 4;
let quorum = quorum(n);
let namespace = b"future_notarization_no_rescue".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(60));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector.clone(),
Duration::from_secs(2),
Duration::from_secs(3),
Duration::from_secs(1),
mocks::application::Certifier::Cancel,
)
.await;
let view_3 = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
view_3,
)
.await;
let proposal_3 = Proposal::new(
Round::new(Epoch::new(333), view_3),
view_3.previous().unwrap(),
Sha256::hash(b"view_3_proposal"),
);
let leader = participants[1].clone();
let contents = (proposal_3.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal_3.payload, contents));
mailbox.proposal(proposal_3.clone()).await;
let (_, notarization_3) = build_notarization(&schemes, &proposal_3, quorum);
mailbox
.resolved(Certificate::Notarization(notarization_3))
.await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == view_3 =>
break,
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(10)) => {
panic!("expected nullify vote for view 3");
},
}
}
let view_4 = View::new(4);
let proposal_4 = Proposal::new(
Round::new(Epoch::new(333), view_4),
view_3, Sha256::hash(b"view_4_proposal"),
);
let (_, notarization_4) = build_notarization(&schemes, &proposal_4, quorum);
mailbox
.resolved(Certificate::Notarization(notarization_4))
.await;
let advanced = loop {
select! {
msg = batcher_receiver.recv() => {
match msg.unwrap() {
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current > view_3 {
break true;
}
}
batcher::Message::Constructed(Vote::Nullify(n)) => {
assert_eq!(
n.view(),
view_3,
"should only vote nullify for stuck view"
);
}
_ => {}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
break false;
},
}
};
assert!(
!advanced,
"receiving a notarization for view 4 should NOT rescue the stuck validator - \
they still can't certify view 3 (no context) and can't form a nullification \
(not enough votes). The f+1 honest validators who advanced to view 4 cannot \
retroactively help because they can only vote nullify for their current view (4), \
not for view 3."
);
let (_, finalization_4) = build_finalization(&schemes, &proposal_4, quorum);
mailbox
.resolved(Certificate::Finalization(finalization_4))
.await;
let rescued = loop {
select! {
msg = batcher_receiver.recv() => {
if let batcher::Message::Update {
current, response, ..
} = msg.unwrap()
{
response.send(None).unwrap();
if current > view_4 {
break true;
}
}
},
_ = context.sleep(Duration::from_secs(5)) => {
break false;
},
}
};
assert!(
rescued,
"a finalization certificate SHOULD rescue the stuck validator - \
this is the ONLY escape route, but it requires Byzantine cooperation \
(they must vote finalize). If Byzantine permanently withhold finalize votes, \
the stuck validators are permanently excluded from consensus."
);
});
}
#[test_traced]
fn test_only_finalization_rescues_validator() {
only_finalization_rescues_validator::<_, _>(bls12381_threshold_vrf::fixture::<MinPk, _>);
only_finalization_rescues_validator::<_, _>(bls12381_threshold_vrf::fixture::<MinSig, _>);
only_finalization_rescues_validator::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
only_finalization_rescues_validator::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
only_finalization_rescues_validator::<_, _>(ed25519::fixture);
only_finalization_rescues_validator::<_, _>(secp256r1::fixture);
}
fn certification_failure_allows_nullify_after_notarize<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"cert_fail_nullify".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(100), Duration::from_secs(100),
Duration::from_secs(100),
mocks::application::Certifier::Custom(Box::new(|_| false)),
)
.await;
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"test_proposal"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(n)) if n.view() == target_view => {
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected notarize vote for view {target_view}");
},
}
}
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify)) if nullify.view() == target_view => {
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!(
"voter should emit nullify for view {target_view} after certification failure, \
even though it already voted notarize"
);
},
}
}
});
}
#[test_traced]
fn test_certification_failure_allows_nullify_after_notarize() {
certification_failure_allows_nullify_after_notarize::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
certification_failure_allows_nullify_after_notarize::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
certification_failure_allows_nullify_after_notarize::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
certification_failure_allows_nullify_after_notarize::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
certification_failure_allows_nullify_after_notarize::<_, _>(ed25519::fixture);
certification_failure_allows_nullify_after_notarize::<_, _>(secp256r1::fixture);
}
fn pending_certification_nullifies_on_timeout<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"pending_cert_nullify".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(3),
Duration::from_secs(4),
Duration::from_mins(60),
mocks::application::Certifier::Pending,
)
.await;
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"test_proposal"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(n))
if n.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected notarize vote for view {target_view}");
},
}
}
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(8)) => {
panic!(
"voter should emit nullify for view {target_view} via timeout \
when certification hangs indefinitely",
);
},
}
}
});
}
#[test_traced]
fn test_pending_certification_nullifies_on_timeout() {
pending_certification_nullifies_on_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
pending_certification_nullifies_on_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
pending_certification_nullifies_on_timeout::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
pending_certification_nullifies_on_timeout::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
pending_certification_nullifies_on_timeout::<_, _>(ed25519::fixture);
pending_certification_nullifies_on_timeout::<_, _>(secp256r1::fixture);
}
fn proposal_clears_leader_timeout_before_certification_timeout<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"proposal_clears_leader_timeout".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(15));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, relay, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_mins(60),
mocks::application::Certifier::Always,
)
.await;
let target_view = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"proposal_clears_leader_timeout"),
);
let leader = participants[1].clone();
let contents = (proposal.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(v)) if v.view() == target_view => {
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected notarize vote for view {target_view}");
},
}
}
let no_nullify_deadline = context.current() + Duration::from_secs(2);
loop {
select! {
_ = context.sleep_until(no_nullify_deadline) => break,
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
panic!(
"received nullify for view {target_view} before certification timeout"
);
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
}
}
}
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(6)) => {
panic!(
"expected nullify for view {target_view} after certification timeout"
);
},
}
}
});
}
#[test_traced]
fn test_proposal_clears_leader_timeout_before_certification_timeout() {
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(ed25519::fixture);
proposal_clears_leader_timeout_before_certification_timeout::<_, _>(secp256r1::fixture);
}
fn recovered_proposal_clears_leader_timeout_before_certification_timeout<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"recovered_proposal_clears_leader_timeout".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(15));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let elector = RoundRobin::<Sha256>::default();
let (mut mailbox, mut batcher_receiver, _, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_mins(60),
mocks::application::Certifier::Pending,
)
.await;
let target_view = View::new(3);
advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
target_view,
)
.await;
let proposal = Proposal::new(
Round::new(Epoch::new(333), target_view),
target_view.previous().unwrap(),
Sha256::hash(b"recovered_proposal_clears_leader_timeout"),
);
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.recovered(Certificate::Notarization(notarization))
.await;
let quiet_deadline = context.current() + Duration::from_secs(2);
loop {
select! {
_ = context.sleep_until(quiet_deadline) => break,
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(v)) if v.view() == target_view => {
panic!(
"unexpected notarize for view {target_view} from recovered certificate"
);
}
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
panic!(
"received nullify for view {target_view} before certification timeout after recovered certificate"
);
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
}
}
}
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == target_view =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(6)) => {
panic!(
"expected nullify for view {target_view} after certification timeout with recovered certificate"
);
},
}
}
});
}
#[test_traced]
fn test_recovered_proposal_clears_leader_timeout_before_certification_timeout() {
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
ed25519::fixture,
);
recovered_proposal_clears_leader_timeout_before_certification_timeout::<_, _>(
secp256r1::fixture,
);
}
fn next_view_gets_fresh_timeout_after_prior_view_nullifies<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"next_view_gets_fresh_timeout_after_prior_view_nullifies".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(15));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
true,
)
.await;
let (mut mailbox, mut batcher_receiver, _, _, _) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
RoundRobin::<Sha256>::default(),
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_mins(60),
mocks::application::Certifier::Always,
)
.await;
loop {
match batcher_receiver.recv().await.unwrap() {
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current == View::new(1) {
break;
}
}
batcher::Message::Constructed(_) => {}
}
}
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(1) =>
{
break;
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected nullify for view 1");
},
}
}
let (_, nullification) =
build_nullification(&schemes, Round::new(Epoch::new(333), View::new(1)), quorum);
mailbox
.resolved(Certificate::Nullification(nullification))
.await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(2) =>
{
panic!(
"received nullify for view 2 before its fresh leader timeout elapsed"
);
}
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current == View::new(2) {
break;
}
}
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected voter to advance to view 2");
},
}
}
let quiet_deadline = context.current() + Duration::from_millis(500);
loop {
select! {
_ = context.sleep_until(quiet_deadline) => break,
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(2) =>
{
panic!(
"received nullify for view 2 before its fresh leader timeout elapsed"
);
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
}
}
}
});
}
#[test_traced]
fn test_next_view_gets_fresh_timeout_after_prior_view_nullifies() {
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(
bls12381_multisig::fixture::<MinPk, _>,
);
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(
bls12381_multisig::fixture::<MinSig, _>,
);
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(ed25519::fixture);
next_view_gets_fresh_timeout_after_prior_view_nullifies::<_, _>(secp256r1::fixture);
}
fn first_view_progress_without_timeout<S, F, L>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
L: ElectorConfig<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"first_view_progress_without_timeout".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(15));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;
let elector = L::default();
let first_round = Round::new(Epoch::new(333), View::new(1));
let leader_idx = elector
.clone()
.build(schemes[0].participants())
.elect(first_round, None);
let leader = participants[usize::from(leader_idx)].clone();
let (mut mailbox, mut batcher_receiver, _, relay, reporter) = setup_voter(
&mut context,
&oracle,
&participants,
&schemes,
elector,
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_mins(60),
mocks::application::Certifier::Always,
)
.await;
let message = batcher_receiver.recv().await.unwrap();
match message {
batcher::Message::Update {
current,
finalized,
response,
..
} => {
assert_eq!(current, View::new(1));
assert_eq!(finalized, View::new(0));
response.send(None).unwrap();
}
_ => panic!("unexpected batcher message"),
}
let mut hasher = Sha256::default();
hasher.update(&(bytes::Bytes::from_static(b"genesis"), Epoch::new(333)).encode());
let genesis = hasher.finalize();
let proposal = Proposal::new(
first_round,
View::zero(),
Sha256::hash(b"first_view_progress_without_timeout"),
);
let contents = (proposal.round, genesis, 0u64).encode();
relay.broadcast(&leader, (proposal.payload, contents));
mailbox.proposal(proposal.clone()).await;
loop {
select! {
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Notarize(notarize))
if notarize.view() == View::new(1) =>
{
break;
}
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(1) =>
{
panic!("unexpected nullify for view 1 while peers are online");
}
batcher::Message::Update { response, .. } => response.send(None).unwrap(),
_ => {}
},
_ = context.sleep(Duration::from_secs(2)) => {
panic!("expected notarize for view 1");
},
}
}
let (_, notarization) = build_notarization(&schemes, &proposal, quorum);
mailbox
.resolved(Certificate::Notarization(notarization))
.await;
let deadline = context.current() + Duration::from_secs(3);
let reached_view2 = loop {
select! {
_ = context.sleep_until(deadline) => break false,
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Finalize(finalize))
if finalize.view() == View::new(1) =>
{
break false;
}
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(1) =>
{
panic!("unexpected nullify for view 1 while peers are online");
}
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current >= View::new(2) {
break true;
}
}
_ => {}
},
}
};
assert!(!reached_view2, "view advanced before finalize for view 1");
let reached_view2 = loop {
select! {
_ = context.sleep_until(deadline) => break false,
msg = batcher_receiver.recv() => match msg.unwrap() {
batcher::Message::Constructed(Vote::Nullify(nullify))
if nullify.view() == View::new(1) =>
{
panic!("unexpected nullify for view 1 while peers are online");
}
batcher::Message::Update {
current, response, ..
} => {
response.send(None).unwrap();
if current >= View::new(2) {
break true;
}
}
_ => {}
},
}
};
assert!(reached_view2, "expected progress to view 2 from view 1");
context.sleep(Duration::from_millis(50)).await;
assert!(
!reporter.nullifies.lock().contains_key(&View::new(1)),
"did not expect nullify votes for view 1"
);
assert!(
!reporter.nullifications.lock().contains_key(&View::new(1)),
"did not expect a nullification certificate for view 1"
);
});
}
#[test_traced]
fn test_first_view_progress_without_timeout() {
first_view_progress_without_timeout::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
first_view_progress_without_timeout::<_, _, Random>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
first_view_progress_without_timeout::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinPk, _>,
);
first_view_progress_without_timeout::<_, _, RoundRobin>(
bls12381_multisig::fixture::<MinSig, _>,
);
first_view_progress_without_timeout::<_, _, RoundRobin>(ed25519::fixture);
first_view_progress_without_timeout::<_, _, RoundRobin>(secp256r1::fixture);
}
}