mod actor;
mod ingress;
mod round;
mod verifier;
use crate::{
simplex::config::ForwardingPolicy,
types::{Epoch, ViewDelta},
Relay, Reporter,
};
pub use actor::Actor;
use commonware_cryptography::certificate::Scheme;
use commonware_p2p::Blocker;
use commonware_parallel::Strategy;
pub use ingress::{Mailbox, Message};
pub use round::Round;
pub use verifier::Verifier;
pub struct Config<S: Scheme, B: Blocker, Re: Reporter, Rl: Relay, T: Strategy> {
pub scheme: S,
pub blocker: B,
pub reporter: Re,
pub relay: Rl,
pub strategy: T,
pub activity_timeout: ViewDelta,
pub skip_timeout: ViewDelta,
pub epoch: Epoch,
pub mailbox_size: usize,
pub forwarding: ForwardingPolicy,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
simplex::{
actors::voter,
config::ForwardingPolicy,
elector::RoundRobin,
mocks, quorum,
scheme::{
bls12381_multisig,
bls12381_threshold::{
standard as bls12381_threshold_std, vrf as bls12381_threshold_vrf,
},
ed25519, secp256r1, Scheme,
},
types::{
Activity, Certificate, Finalization, Finalize, Notarization, Notarize,
Nullification, Nullify, Proposal, Vote,
},
Plan,
},
types::{Participant, Round, View},
Viewable,
};
use commonware_codec::Encode;
use commonware_cryptography::{
bls12381::primitives::variant::{MinPk, MinSig},
certificate::mocks::Fixture,
ed25519::{PrivateKey, PublicKey},
sha256::Digest as Sha256Digest,
Hasher as _, Sha256, Signer,
};
use commonware_macros::{select, test_traced};
use commonware_p2p::{
simulated::{Config as NConfig, Link, Network, Oracle},
Manager as _, Recipients, Sender as _, TrackedPeers,
};
use commonware_parallel::Sequential;
use commonware_runtime::{deterministic, Clock, Metrics, Quota, Runner};
use commonware_utils::{channel::mpsc, ordered::Set, sync::Mutex, NZUsize};
use std::{num::NonZeroU32, sync::Arc, time::Duration};
type Broadcasts = Arc<Mutex<Vec<(Sha256Digest, Round, Vec<PublicKey>)>>>;
#[derive(Clone)]
struct MockRelay {
broadcasts: Broadcasts,
}
impl MockRelay {
fn new() -> Self {
Self {
broadcasts: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl crate::Relay for MockRelay {
type Digest = Sha256Digest;
type PublicKey = PublicKey;
type Plan = Plan<PublicKey>;
async fn broadcast(&mut self, payload: Sha256Digest, plan: Self::Plan) {
if let Plan::Forward { round, peers } = plan {
self.broadcasts.lock().push((payload, round, peers));
}
}
}
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
async fn start_test_network_with_peers<I>(
context: deterministic::Context,
peers: I,
) -> Oracle<PublicKey, deterministic::Context>
where
I: IntoIterator<Item = PublicKey>,
{
let (network, oracle) = Network::new_with_peers(
context.with_label("network"),
NConfig {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
peers,
)
.await;
network.start();
oracle
}
async fn track_test_peers(
context: &mut deterministic::Context,
oracle: &commonware_p2p::simulated::Oracle<PublicKey, deterministic::Context>,
id: u64,
primary: &[PublicKey],
secondary: &[PublicKey],
) {
oracle
.manager()
.track(
id,
TrackedPeers::new(
Set::from_iter_dedup(primary.iter().cloned()),
Set::from_iter_dedup(secondary.iter().cloned()),
),
)
.await;
context.sleep(Duration::from_millis(10)).await;
}
fn build_notarization<S: Scheme<Sha256Digest>>(
schemes: &[S],
proposal: &Proposal<Sha256Digest>,
count: usize,
) -> Notarization<S, Sha256Digest> {
let votes: Vec<_> = schemes
.iter()
.take(count)
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
Notarization::from_notarizes(&schemes[0], &votes, &Sequential)
.expect("notarization requires a quorum of votes")
}
fn build_nullification<S: Scheme<Sha256Digest>>(
schemes: &[S],
round: Round,
count: usize,
) -> Nullification<S> {
let votes: Vec<_> = schemes
.iter()
.take(count)
.map(|scheme| Nullify::sign::<Sha256Digest>(scheme, round).unwrap())
.collect();
Nullification::from_nullifies(&schemes[0], &votes, &Sequential)
.expect("nullification requires a quorum of votes")
}
fn build_finalization<S: Scheme<Sha256Digest>>(
schemes: &[S],
proposal: &Proposal<Sha256Digest>,
count: usize,
) -> Finalization<S, Sha256Digest> {
let votes: Vec<_> = schemes
.iter()
.take(count)
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&schemes[0], &votes, &Sequential)
.expect("finalization requires a quorum of votes")
}
fn certificate_forwarding_from_network<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
let (mut injector_sender, _injector_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(injector_pk.clone(), me.clone(), link)
.await
.unwrap();
track_test_peers(
&mut context,
&oracle,
1,
&participants,
std::slice::from_ref(&injector_pk),
)
.await;
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let nullify = batcher_mailbox.update(view, Participant::new(0), View::zero(), None).await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let notarization = build_notarization(&schemes, &proposal, quorum);
let nullification = build_nullification(&schemes, round, quorum);
let finalization = build_finalization(&schemes, &proposal, quorum);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Notarization(notarization.clone()).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view)
);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::<S, Sha256Digest>::Nullification(nullification.clone())
.encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Nullification(n), _) if n.view() == view)
);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Finalization(finalization.clone()).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Finalization(f), _) if f.view() == view)
);
});
}
#[test_traced]
fn test_certificate_forwarding_from_network() {
certificate_forwarding_from_network(bls12381_threshold_vrf::fixture::<MinPk, _>);
certificate_forwarding_from_network(bls12381_threshold_vrf::fixture::<MinSig, _>);
certificate_forwarding_from_network(bls12381_threshold_std::fixture::<MinPk, _>);
certificate_forwarding_from_network(bls12381_threshold_std::fixture::<MinSig, _>);
certificate_forwarding_from_network(bls12381_multisig::fixture::<MinPk, _>);
certificate_forwarding_from_network(bls12381_multisig::fixture::<MinSig, _>);
certificate_forwarding_from_network(ed25519::fixture);
certificate_forwarding_from_network(secp256r1::fixture);
}
fn old_notarization_after_nullification_is_forwarded<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_old_notarization_after_nullification".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let injector_pk = PrivateKey::from_seed(1_000_001).public_key();
let (mut injector_sender, _injector_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(injector_pk.clone(), me.clone(), link)
.await
.unwrap();
track_test_peers(
&mut context,
&oracle,
1,
&participants,
std::slice::from_ref(&injector_pk),
)
.await;
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let target_view = View::new(1);
let nullify = batcher_mailbox
.update(target_view, Participant::new(0), View::zero(), None)
.await;
assert!(nullify.is_none());
let round = Round::new(epoch, target_view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let nullification = build_nullification(&schemes, round, quorum_size);
let notarization = build_notarization(&schemes, &proposal, quorum_size);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::<S, Sha256Digest>::Nullification(nullification).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Nullification(n), _) if n.view() == target_view)
);
let nullify = batcher_mailbox
.update(target_view.next(), Participant::new(1), View::zero(), None)
.await;
assert!(nullify.is_none());
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Notarization(notarization).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == target_view)
);
});
}
#[test_traced]
fn test_old_notarization_after_nullification_is_forwarded() {
old_notarization_after_nullification_is_forwarded(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
old_notarization_after_nullification_is_forwarded(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
old_notarization_after_nullification_is_forwarded(
bls12381_threshold_std::fixture::<MinPk, _>,
);
old_notarization_after_nullification_is_forwarded(
bls12381_threshold_std::fixture::<MinSig, _>,
);
old_notarization_after_nullification_is_forwarded(bls12381_multisig::fixture::<MinPk, _>);
old_notarization_after_nullification_is_forwarded(bls12381_multisig::fixture::<MinSig, _>);
old_notarization_after_nullification_is_forwarded(ed25519::fixture);
old_notarization_after_nullification_is_forwarded(secp256r1::fixture);
}
fn quorum_votes_construct_certificate<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle.control(pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let leader = Participant::new(1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_vote))
.await;
context.sleep(Duration::from_millis(100)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(&output, voter::Message::Proposal(p) if p.view() == view && p.payload == Sha256::hash(b"test_payload"))
);
let output = voter_receiver.recv().await.unwrap();
assert!(matches!(output, voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view));
assert!(
relay.broadcasts.lock().is_empty(),
"disabled forwarding should produce no broadcasts"
);
});
}
#[test_traced]
fn test_quorum_votes_construct_certificate() {
quorum_votes_construct_certificate(bls12381_threshold_vrf::fixture::<MinPk, _>);
quorum_votes_construct_certificate(bls12381_threshold_vrf::fixture::<MinSig, _>);
quorum_votes_construct_certificate(bls12381_threshold_std::fixture::<MinPk, _>);
quorum_votes_construct_certificate(bls12381_threshold_std::fixture::<MinSig, _>);
quorum_votes_construct_certificate(bls12381_multisig::fixture::<MinPk, _>);
quorum_votes_construct_certificate(bls12381_multisig::fixture::<MinSig, _>);
quorum_votes_construct_certificate(ed25519::fixture);
quorum_votes_construct_certificate(secp256r1::fixture);
}
fn forward_emitted_on_view_advance_with_forwardable_proposal<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_forwarding".to_vec();
let epoch = Epoch::new(1);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentVoters,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote)).await;
context.sleep(Duration::from_millis(100)).await;
let _ = voter_receiver.recv().await.unwrap();
let _ = voter_receiver.recv().await.unwrap();
{
let broadcasts = relay.broadcasts.lock();
assert!(
broadcasts.is_empty(),
"notarization alone should not trigger forwarding"
);
}
batcher_mailbox
.update(
View::new(2),
Participant::new(2),
View::zero(),
Some(proposal.clone()),
)
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal.payload);
assert_eq!(forwarded_round, proposal.round);
assert_eq!(peers, &vec![participants[4].clone()]);
});
}
#[test_traced]
fn test_forward_emitted_on_view_advance_with_forwardable_proposal() {
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_threshold_std::fixture::<MinPk, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_threshold_std::fixture::<MinSig, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_multisig::fixture::<MinPk, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(
bls12381_multisig::fixture::<MinSig, _>,
);
forward_emitted_on_view_advance_with_forwardable_proposal(ed25519::fixture);
forward_emitted_on_view_advance_with_forwardable_proposal(secp256r1::fixture);
}
fn silent_leader_forwarding_respects_missing_vote<S, F>(mut fixture: F, leader_voted: bool)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_silent_leader_forwarding".to_vec();
let epoch = Epoch::new(101);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentLeader,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let next_leader = Participant::new(2);
batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
let proposal = Proposal::new(
Round::new(epoch, view),
View::zero(),
Sha256::hash(b"silent_leader_payload"),
);
let voter_indices: &[usize] = if leader_voted { &[1, 2, 3] } else { &[1, 3, 4] };
for &i in voter_indices {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote)).await;
let mut saw_notarization = false;
loop {
let output = select! {
output = voter_receiver.recv() => output,
_ = context.sleep(Duration::from_millis(100)) => None,
};
let Some(output) = output else {
break;
};
if matches!(
output,
voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view
) {
saw_notarization = true;
break;
}
}
assert!(saw_notarization, "expected notarization");
{
let broadcasts = relay.broadcasts.lock();
assert!(
broadcasts.is_empty(),
"notarization alone should not trigger forwarding"
);
}
batcher_mailbox
.update(
View::new(2),
next_leader,
View::zero(),
Some(proposal.clone()),
)
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
if leader_voted {
assert!(
broadcasts.is_empty(),
"next leader should not be forwarded to when their vote was observed"
);
} else {
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal.payload);
assert_eq!(forwarded_round, proposal.round);
assert_eq!(peers, &vec![participants[2].clone()]);
}
});
}
#[test_traced]
fn test_silent_leader_forwarding_targets_missing_leader() {
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_vrf::fixture::<MinPk, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_vrf::fixture::<MinSig, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_std::fixture::<MinPk, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_std::fixture::<MinSig, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_multisig::fixture::<MinPk, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_multisig::fixture::<MinSig, _>,
false,
);
silent_leader_forwarding_respects_missing_vote(ed25519::fixture, false);
silent_leader_forwarding_respects_missing_vote(secp256r1::fixture, false);
}
#[test_traced]
fn test_silent_leader_forwarding_skips_observed_leader() {
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_vrf::fixture::<MinPk, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_vrf::fixture::<MinSig, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_std::fixture::<MinPk, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_threshold_std::fixture::<MinSig, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_multisig::fixture::<MinPk, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(
bls12381_multisig::fixture::<MinSig, _>,
true,
);
silent_leader_forwarding_respects_missing_vote(ed25519::fixture, true);
silent_leader_forwarding_respects_missing_vote(secp256r1::fixture, true);
}
fn forward_emitted_for_network_notarization_on_view_advance<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_network_notarization_forwarding".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentVoters,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
let injector_pk = PrivateKey::from_seed(2_000_000).public_key();
let (mut injector_sender, _injector_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(injector_pk.clone(), me.clone(), link.clone())
.await
.unwrap();
track_test_peers(
&mut context,
&oracle,
1,
&participants,
std::slice::from_ref(&injector_pk),
)
.await;
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
let proposal = Proposal::new(
Round::new(epoch, view),
View::zero(),
Sha256::hash(b"payload"),
);
for i in 1..(quorum_size - 1) {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote)).await;
let notarization = build_notarization(&schemes, &proposal, quorum_size);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Notarization(notarization).encode(),
true,
)
.await
.unwrap();
let mut saw_notarization = false;
loop {
let output = select! {
output = voter_receiver.recv() => output,
_ = context.sleep(Duration::from_millis(100)) => None,
};
let Some(output) = output else {
break;
};
if matches!(
output,
voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view
) {
saw_notarization = true;
break;
}
}
assert!(
saw_notarization,
"expected notarization from certificate_receiver"
);
{
let broadcasts = relay.broadcasts.lock();
assert!(
broadcasts.is_empty(),
"network notarization alone should not trigger forwarding"
);
}
batcher_mailbox
.update(
View::new(2),
Participant::new(2),
View::zero(),
Some(proposal.clone()),
)
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal.payload);
assert_eq!(forwarded_round, proposal.round);
assert_eq!(
peers,
&vec![participants[3].clone(), participants[4].clone()]
);
});
}
#[test_traced]
fn test_forward_emitted_for_network_notarization_on_view_advance() {
forward_emitted_for_network_notarization_on_view_advance(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
forward_emitted_for_network_notarization_on_view_advance(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
forward_emitted_for_network_notarization_on_view_advance(
bls12381_threshold_std::fixture::<MinPk, _>,
);
forward_emitted_for_network_notarization_on_view_advance(
bls12381_threshold_std::fixture::<MinSig, _>,
);
forward_emitted_for_network_notarization_on_view_advance(
bls12381_multisig::fixture::<MinPk, _>,
);
forward_emitted_for_network_notarization_on_view_advance(
bls12381_multisig::fixture::<MinSig, _>,
);
forward_emitted_for_network_notarization_on_view_advance(ed25519::fixture);
forward_emitted_for_network_notarization_on_view_advance(secp256r1::fixture);
}
fn self_excluded_from_forward_targets<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_self_excluded_forward_targets".to_vec();
let epoch = Epoch::new(444);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentVoters,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let injector_pk = PrivateKey::from_seed(3_000_000).public_key();
let (mut injector_sender, _injector_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(injector_pk.clone(), me.clone(), link)
.await
.unwrap();
track_test_peers(
&mut context,
&oracle,
1,
&participants,
std::slice::from_ref(&injector_pk),
)
.await;
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
let proposal = Proposal::new(
Round::new(epoch, view),
View::zero(),
Sha256::hash(b"certificate_only_payload"),
);
let notarization = build_notarization(&schemes, &proposal, quorum_size);
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Notarization(notarization).encode(),
true,
)
.await
.unwrap();
let mut saw_notarization = false;
loop {
let output = select! {
output = voter_receiver.recv() => output,
_ = context.sleep(Duration::from_millis(100)) => None,
};
let Some(output) = output else {
break;
};
if matches!(
output,
voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view
) {
saw_notarization = true;
break;
}
}
assert!(
saw_notarization,
"expected notarization from certificate_receiver"
);
batcher_mailbox
.update(
View::new(2),
Participant::new(2),
View::zero(),
Some(proposal.clone()),
)
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal.payload);
assert_eq!(forwarded_round, proposal.round);
assert_eq!(peers, &participants[1..].to_vec());
assert!(
!peers.contains(&participants[0]),
"batcher must not target itself when forwarding"
);
});
}
#[test_traced]
fn test_self_excluded_from_forward_targets() {
self_excluded_from_forward_targets(bls12381_threshold_vrf::fixture::<MinPk, _>);
self_excluded_from_forward_targets(bls12381_threshold_vrf::fixture::<MinSig, _>);
self_excluded_from_forward_targets(bls12381_threshold_std::fixture::<MinPk, _>);
self_excluded_from_forward_targets(bls12381_threshold_std::fixture::<MinSig, _>);
self_excluded_from_forward_targets(bls12381_multisig::fixture::<MinPk, _>);
self_excluded_from_forward_targets(bls12381_multisig::fixture::<MinSig, _>);
self_excluded_from_forward_targets(ed25519::fixture);
self_excluded_from_forward_targets(secp256r1::fixture);
}
fn conflicting_notarize_voter_is_forwarded<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 7;
let namespace = b"batcher_conflicting_notarize_forwarding".to_vec();
let epoch = Epoch::new(444);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentVoters,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view2 = View::new(2);
let leader2 = Participant::new(1);
batcher_mailbox
.update(view2, leader2, View::zero(), None)
.await;
let round2 = Round::new(epoch, view2);
let proposal_a = Proposal::new(round2, View::new(1), Sha256::hash(b"proposal_a"));
let proposal_b = Proposal::new(round2, View::new(1), Sha256::hash(b"proposal_b"));
let leader_vote = Notarize::sign(&schemes[1], proposal_a.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[1] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(leader_vote).encode(),
true,
)
.await
.unwrap();
}
let active_nullify = Nullify::sign::<Sha256Digest>(&schemes[6], round2).unwrap();
if let Some(ref mut sender) = participant_senders[6] {
sender
.send(
Recipients::One(me.clone()),
Vote::<S, Sha256Digest>::Nullify(active_nullify).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(50)).await;
let conflicting_vote = Notarize::sign(&schemes[2], proposal_b).unwrap();
if let Some(ref mut sender) = participant_senders[2] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(conflicting_vote).encode(),
true,
)
.await
.unwrap();
}
for i in 3..=5 {
let honest_vote = Notarize::sign(&schemes[i], proposal_a.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(honest_vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote2 = Notarize::sign(&schemes[0], proposal_a.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote2)).await;
context.sleep(Duration::from_millis(100)).await;
let mut saw_notarization = false;
loop {
let output = select! {
output = voter_receiver.recv() => output,
_ = context.sleep(Duration::from_millis(100)) => None,
};
let Some(output) = output else {
break;
};
match output {
voter::Message::Proposal(p) => {
assert_eq!(p.view(), view2);
assert_eq!(p.payload, proposal_a.payload);
}
voter::Message::Verified(Certificate::Notarization(n), _) => {
assert_eq!(n.view(), view2);
assert_eq!(n.proposal.payload, proposal_a.payload);
saw_notarization = true;
break;
}
_ => panic!("unexpected batcher output"),
}
}
assert!(
saw_notarization,
"expected notarization for the leader proposal"
);
{
let broadcasts = relay.broadcasts.lock();
assert!(
broadcasts.is_empty(),
"notarization alone should not trigger forwarding"
);
}
let view3 = View::new(3);
let leader3 = Participant::new(3);
batcher_mailbox
.update(view3, leader3, View::zero(), Some(proposal_a.clone()))
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal_a.payload);
assert_eq!(forwarded_round, proposal_a.round);
assert_eq!(
peers,
&vec![participants[2].clone(), participants[6].clone()]
);
});
}
#[test_traced]
fn test_conflicting_notarize_voter_is_forwarded() {
conflicting_notarize_voter_is_forwarded(bls12381_threshold_vrf::fixture::<MinPk, _>);
conflicting_notarize_voter_is_forwarded(bls12381_threshold_vrf::fixture::<MinSig, _>);
conflicting_notarize_voter_is_forwarded(bls12381_threshold_std::fixture::<MinPk, _>);
conflicting_notarize_voter_is_forwarded(bls12381_threshold_std::fixture::<MinSig, _>);
conflicting_notarize_voter_is_forwarded(bls12381_multisig::fixture::<MinPk, _>);
conflicting_notarize_voter_is_forwarded(bls12381_multisig::fixture::<MinSig, _>);
conflicting_notarize_voter_is_forwarded(ed25519::fixture);
conflicting_notarize_voter_is_forwarded(secp256r1::fixture);
}
fn finalize_voter_excluded_from_forwarding<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 7;
let namespace = b"batcher_finalize_voter_forwarding".to_vec();
let epoch = Epoch::new(555);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let relay = MockRelay::new();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: relay.clone(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::SilentVoters,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view2 = View::new(2);
let leader2 = Participant::new(1);
batcher_mailbox
.update(view2, leader2, View::zero(), None)
.await;
let round2 = Round::new(epoch, view2);
let proposal = Proposal::new(round2, View::new(1), Sha256::hash(b"payload"));
let finalize_vote = Finalize::sign(&schemes[6], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[6] {
sender
.send(
Recipients::One(me.clone()),
Vote::Finalize(finalize_vote).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(5)).await;
for i in 1..5 {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote)).await;
context.sleep(Duration::from_millis(100)).await;
let mut saw_notarization = false;
loop {
let output = select! {
output = voter_receiver.recv() => output,
_ = context.sleep(Duration::from_millis(100)) => None,
};
let Some(output) = output else {
break;
};
match output {
voter::Message::Verified(Certificate::Notarization(n), _) => {
assert_eq!(n.view(), view2);
saw_notarization = true;
break;
}
voter::Message::Proposal(_) => {}
_ => panic!("unexpected batcher output"),
}
}
assert!(saw_notarization, "expected notarization");
{
let broadcasts = relay.broadcasts.lock();
assert!(
broadcasts.is_empty(),
"notarization alone should not trigger forwarding"
);
}
let view3 = View::new(3);
batcher_mailbox
.update(
view3,
Participant::new(3),
View::zero(),
Some(proposal.clone()),
)
.await;
context.sleep(Duration::from_millis(50)).await;
let broadcasts = relay.broadcasts.lock();
assert_eq!(
broadcasts.len(),
1,
"expected exactly one targeted broadcast"
);
let (ref digest, forwarded_round, ref peers) = broadcasts[0];
assert_eq!(*digest, proposal.payload);
assert_eq!(forwarded_round, proposal.round);
assert_eq!(peers, &vec![participants[5].clone()]);
});
}
#[test_traced]
fn test_finalize_voter_excluded_from_forwarding() {
finalize_voter_excluded_from_forwarding(bls12381_threshold_vrf::fixture::<MinPk, _>);
finalize_voter_excluded_from_forwarding(bls12381_threshold_vrf::fixture::<MinSig, _>);
finalize_voter_excluded_from_forwarding(bls12381_threshold_std::fixture::<MinPk, _>);
finalize_voter_excluded_from_forwarding(bls12381_threshold_std::fixture::<MinSig, _>);
finalize_voter_excluded_from_forwarding(bls12381_multisig::fixture::<MinPk, _>);
finalize_voter_excluded_from_forwarding(bls12381_multisig::fixture::<MinSig, _>);
finalize_voter_excluded_from_forwarding(ed25519::fixture);
finalize_voter_excluded_from_forwarding(secp256r1::fixture);
}
fn votes_and_certificate_deduplication<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle.control(pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
let (mut injector_sender, _injector_receiver) = oracle
.control(injector_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(injector_pk.clone(), me.clone(), link.clone())
.await
.unwrap();
track_test_peers(
&mut context,
&oracle,
1,
&participants,
std::slice::from_ref(&injector_pk),
)
.await;
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let leader = Participant::new(1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let notarization = build_notarization(&schemes, &proposal, quorum_size);
for i in 1..quorum_size - 1 {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox.constructed(Vote::Notarize(our_vote)).await;
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(matches!(&output, voter::Message::Proposal(p) if p.view() == view));
injector_sender
.send(
Recipients::One(me.clone()),
Certificate::Notarization(notarization.clone()).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Notarization(n), _) if n.view() == view)
);
let last_vote =
Notarize::sign(&schemes[quorum_size - 1], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[quorum_size - 1] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(last_vote).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(50)).await;
let got_duplicate = select! {
_ = voter_receiver.recv() => { true },
_ = context.sleep(Duration::from_millis(100)) => { false },
};
assert!(!got_duplicate, "Should not receive duplicate certificate");
});
}
#[test_traced]
fn test_votes_and_certificate_deduplication() {
votes_and_certificate_deduplication(bls12381_threshold_vrf::fixture::<MinPk, _>);
votes_and_certificate_deduplication(bls12381_threshold_vrf::fixture::<MinSig, _>);
votes_and_certificate_deduplication(bls12381_threshold_std::fixture::<MinPk, _>);
votes_and_certificate_deduplication(bls12381_threshold_std::fixture::<MinSig, _>);
votes_and_certificate_deduplication(bls12381_multisig::fixture::<MinPk, _>);
votes_and_certificate_deduplication(bls12381_multisig::fixture::<MinSig, _>);
votes_and_certificate_deduplication(ed25519::fixture);
votes_and_certificate_deduplication(secp256r1::fixture);
}
fn conflicting_votes_dont_produce_invalid_certificate<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 7;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle.control(pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let leader = Participant::new(1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal_a = Proposal::new(round, View::zero(), Sha256::hash(b"payload_a"));
let proposal_b = Proposal::new(round, View::zero(), Sha256::hash(b"payload_b"));
let leader_vote =
Notarize::sign(&schemes[1], proposal_a.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[1] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(leader_vote).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(matches!(
&output,
voter::Message::Proposal(p) if p.view() == view && p.payload == Sha256::hash(b"payload_a")
));
for i in 2..=5 {
let vote = Notarize::sign(&schemes[i], proposal_b.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
context.sleep(Duration::from_millis(100)).await;
let got_certificate = select! {
_output = voter_receiver.recv() => { true },
_ = context.sleep(Duration::from_millis(100)) => { false },
};
assert!(
!got_certificate,
"Should not have certificate - only 1 vote for leader's proposal"
);
let our_vote = Notarize::sign(&schemes[0], proposal_a.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_vote))
.await;
let vote6 = Notarize::sign(&schemes[6], proposal_a.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[6] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote6).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(100)).await;
let got_certificate = select! {
_output = voter_receiver.recv() => { true },
_ = context.sleep(Duration::from_millis(100)) => { false },
};
assert!(
!got_certificate,
"Should not have certificate - only 3 votes for leader's proposal"
);
});
}
#[test_traced]
fn test_conflicting_votes_dont_produce_invalid_certificate() {
conflicting_votes_dont_produce_invalid_certificate(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
conflicting_votes_dont_produce_invalid_certificate(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
conflicting_votes_dont_produce_invalid_certificate(
bls12381_threshold_std::fixture::<MinPk, _>,
);
conflicting_votes_dont_produce_invalid_certificate(
bls12381_threshold_std::fixture::<MinSig, _>,
);
conflicting_votes_dont_produce_invalid_certificate(bls12381_multisig::fixture::<MinPk, _>);
conflicting_votes_dont_produce_invalid_certificate(bls12381_multisig::fixture::<MinSig, _>);
conflicting_votes_dont_produce_invalid_certificate(ed25519::fixture);
conflicting_votes_dont_produce_invalid_certificate(secp256r1::fixture);
}
fn proposal_forwarded_after_leader_set<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let leader_pk = participants[1].clone();
let (mut leader_sender, _leader_receiver) =
oracle.control(leader_pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(leader_pk.clone(), me.clone(), link.clone())
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let leader = Participant::new(1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
context.sleep(Duration::from_millis(10)).await;
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let leader_vote = Notarize::sign(&schemes[1], proposal.clone()).unwrap();
leader_sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(leader_vote).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(&output, voter::Message::Proposal(p) if p.view() == view && p.payload == Sha256::hash(b"test_payload")),
"Expected proposal to be forwarded after leader set"
);
});
}
#[test_traced]
fn test_proposal_forwarded_after_leader_set() {
proposal_forwarded_after_leader_set(bls12381_threshold_vrf::fixture::<MinPk, _>);
proposal_forwarded_after_leader_set(bls12381_threshold_vrf::fixture::<MinSig, _>);
proposal_forwarded_after_leader_set(bls12381_threshold_std::fixture::<MinPk, _>);
proposal_forwarded_after_leader_set(bls12381_threshold_std::fixture::<MinSig, _>);
proposal_forwarded_after_leader_set(bls12381_multisig::fixture::<MinPk, _>);
proposal_forwarded_after_leader_set(bls12381_multisig::fixture::<MinSig, _>);
proposal_forwarded_after_leader_set(ed25519::fixture);
proposal_forwarded_after_leader_set(secp256r1::fixture);
}
fn proposal_forwarded_before_leader_set<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let leader_pk = participants[1].clone();
let (mut leader_sender, _leader_receiver) =
oracle.control(leader_pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(leader_pk.clone(), me.clone(), link.clone())
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let leader_vote = Notarize::sign(&schemes[1], proposal.clone()).unwrap();
leader_sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(leader_vote).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let leader = Participant::new(1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(&output, voter::Message::Proposal(p) if p.view() == view && p.payload == Sha256::hash(b"test_payload")),
"Expected proposal to be forwarded after leader set (vote arrived before leader was known)"
);
});
}
#[test_traced]
fn test_proposal_forwarded_before_leader_set() {
proposal_forwarded_before_leader_set(bls12381_threshold_vrf::fixture::<MinPk, _>);
proposal_forwarded_before_leader_set(bls12381_threshold_vrf::fixture::<MinSig, _>);
proposal_forwarded_before_leader_set(bls12381_threshold_std::fixture::<MinPk, _>);
proposal_forwarded_before_leader_set(bls12381_threshold_std::fixture::<MinSig, _>);
proposal_forwarded_before_leader_set(bls12381_multisig::fixture::<MinPk, _>);
proposal_forwarded_before_leader_set(bls12381_multisig::fixture::<MinSig, _>);
proposal_forwarded_before_leader_set(ed25519::fixture);
proposal_forwarded_before_leader_set(secp256r1::fixture);
}
fn leader_activity_detection<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let skip_timeout = 5u64;
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(skip_timeout),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, _voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) =
oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_certificate_sender, certificate_receiver) =
oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let leader_pk = participants[1].clone();
let (mut leader_sender, _leader_receiver) =
oracle.control(leader_pk.clone()).register(0, TEST_QUOTA).await.unwrap();
oracle
.add_link(leader_pk.clone(), me.clone(), link.clone())
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let leader = Participant::new(1);
for v in 1..skip_timeout {
let view = View::new(v);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none(), "view {v} should be active (before skip_timeout)");
}
let view = View::new(skip_timeout);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(
nullify.is_some(),
"view {skip_timeout} should be inactive (leader hasn't voted in {skip_timeout} views)"
);
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let leader_vote = Notarize::sign(&schemes[1], proposal).unwrap();
leader_sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(leader_vote).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let view = View::new(skip_timeout + 1);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(
nullify.is_none(),
"view {} should be active (leader voted in view {})",
skip_timeout + 1,
skip_timeout
);
let view = View::new(100);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(
nullify.is_some(),
"view 100 should be inactive (leader was last seen in view {skip_timeout})"
);
let self_leader = Participant::new(0);
let view = View::new(101);
let nullify = batcher_mailbox
.update(view, self_leader, View::zero(), None)
.await;
assert!(
nullify.is_none(),
"local leader inactivity should be suppressed"
);
});
}
#[test_traced]
fn test_leader_activity_detection() {
leader_activity_detection(bls12381_threshold_vrf::fixture::<MinPk, _>);
leader_activity_detection(bls12381_threshold_vrf::fixture::<MinSig, _>);
leader_activity_detection(bls12381_threshold_std::fixture::<MinPk, _>);
leader_activity_detection(bls12381_threshold_std::fixture::<MinSig, _>);
leader_activity_detection(bls12381_multisig::fixture::<MinPk, _>);
leader_activity_detection(bls12381_multisig::fixture::<MinSig, _>);
leader_activity_detection(ed25519::fixture);
leader_activity_detection(secp256r1::fixture);
}
fn leader_nullify_marks_active<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_nullify_activity_test".to_vec();
let epoch = Epoch::new(333);
let skip_timeout = 5u64;
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(skip_timeout),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, _voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let leader_pk = participants[1].clone();
let (mut leader_sender, _leader_receiver) = oracle
.control(leader_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(leader_pk.clone(), me.clone(), link)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let leader = Participant::new(1);
for v in 1..=skip_timeout {
let view = View::new(v);
let _ = batcher_mailbox
.update(view, leader, View::zero(), None)
.await;
}
let round = Round::new(epoch, View::new(skip_timeout));
let leader_vote = Nullify::sign::<Sha256Digest>(&schemes[1], round).unwrap();
leader_sender
.send(
Recipients::One(me.clone()),
Vote::<S, Sha256Digest>::Nullify(leader_vote).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let next_view = View::new(skip_timeout + 1);
let nullify = batcher_mailbox
.update(next_view, leader, View::zero(), None)
.await;
assert!(
nullify.is_none(),
"leader should remain active with nullify activity"
);
});
}
#[test_traced]
fn test_leader_nullify_marks_active() {
leader_nullify_marks_active(bls12381_threshold_vrf::fixture::<MinPk, _>);
leader_nullify_marks_active(bls12381_threshold_vrf::fixture::<MinSig, _>);
leader_nullify_marks_active(bls12381_threshold_std::fixture::<MinPk, _>);
leader_nullify_marks_active(bls12381_threshold_std::fixture::<MinSig, _>);
leader_nullify_marks_active(bls12381_multisig::fixture::<MinPk, _>);
leader_nullify_marks_active(bls12381_multisig::fixture::<MinSig, _>);
leader_nullify_marks_active(ed25519::fixture);
leader_nullify_marks_active(secp256r1::fixture);
}
fn leader_certificate_marks_active<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_certificate_activity_test".to_vec();
let epoch = Epoch::new(333);
let skip_timeout = 5u64;
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(skip_timeout),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let leader = Participant::new(1);
let leader_pk = participants[usize::from(leader)].clone();
let (mut leader_sender, _leader_receiver) = oracle
.control(leader_pk.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(leader_pk.clone(), me.clone(), link)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
for v in 1..skip_timeout {
let view = View::new(v);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none(), "view {v} should be active before skip_timeout");
}
let active_view = View::new(skip_timeout);
let nullify = batcher_mailbox
.update(active_view, leader, View::zero(), None)
.await;
assert!(
nullify.is_some(),
"leader should be inactive after {skip_timeout} silent views"
);
let round = Round::new(epoch, active_view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let finalization = build_finalization(&schemes, &proposal, quorum_size);
leader_sender
.send(
Recipients::One(me.clone()),
Certificate::Finalization(finalization.clone()).encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let output = voter_receiver.recv().await.unwrap();
assert!(
matches!(output, voter::Message::Verified(Certificate::Finalization(f), _) if f.view() == active_view)
);
let next_view = active_view.next();
let nullify = batcher_mailbox
.update(next_view, leader, View::zero(), None)
.await;
assert!(
nullify.is_none(),
"leader should remain active after relaying a certificate"
);
});
}
#[test_traced]
fn test_leader_certificate_marks_active() {
leader_certificate_marks_active(bls12381_threshold_vrf::fixture::<MinPk, _>);
leader_certificate_marks_active(bls12381_threshold_vrf::fixture::<MinSig, _>);
leader_certificate_marks_active(bls12381_threshold_std::fixture::<MinPk, _>);
leader_certificate_marks_active(bls12381_threshold_std::fixture::<MinSig, _>);
leader_certificate_marks_active(bls12381_multisig::fixture::<MinPk, _>);
leader_certificate_marks_active(bls12381_multisig::fixture::<MinSig, _>);
leader_certificate_marks_active(ed25519::fixture);
leader_certificate_marks_active(secp256r1::fixture);
}
fn leader_nullify_expire_on_view_entry<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_leader_nullify_expire_on_view_entry".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, _voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let leader_idx = Participant::new(2);
let leader_pk = participants[usize::from(leader_idx)].clone();
let (mut leader_sender, _leader_receiver) = oracle
.control(leader_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(
leader_pk.clone(),
me.clone(),
Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
},
)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let _ = batcher_mailbox
.update(View::new(1), Participant::new(1), View::zero(), None)
.await;
let buffered_view = View::new(2);
leader_sender
.send(
Recipients::One(me.clone()),
Vote::<S, Sha256Digest>::Nullify(
Nullify::sign::<Sha256Digest>(
&schemes[usize::from(leader_idx)],
Round::new(epoch, buffered_view),
)
.expect("nullify"),
)
.encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let nullify = batcher_mailbox
.update(buffered_view, leader_idx, View::zero(), None)
.await;
assert!(
nullify.is_some(),
"buffered leader nullify should skip timeout on view entry"
);
});
}
#[test_traced]
fn test_leader_nullify_expire_on_view_entry() {
leader_nullify_expire_on_view_entry(bls12381_threshold_vrf::fixture::<MinPk, _>);
leader_nullify_expire_on_view_entry(bls12381_threshold_vrf::fixture::<MinSig, _>);
leader_nullify_expire_on_view_entry(bls12381_threshold_std::fixture::<MinPk, _>);
leader_nullify_expire_on_view_entry(bls12381_threshold_std::fixture::<MinSig, _>);
leader_nullify_expire_on_view_entry(bls12381_multisig::fixture::<MinPk, _>);
leader_nullify_expire_on_view_entry(bls12381_multisig::fixture::<MinSig, _>);
leader_nullify_expire_on_view_entry(ed25519::fixture);
leader_nullify_expire_on_view_entry(secp256r1::fixture);
}
fn leader_nullify_wrong_view_no_expire<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let namespace = b"batcher_leader_nullify_wrong_view_no_expire".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let leader = Participant::new(2);
let leader_pk = participants[usize::from(leader)].clone();
let (mut leader_sender, _leader_receiver) = oracle
.control(leader_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(
leader_pk,
me.clone(),
Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
},
)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let current_view = View::new(2);
let _ = batcher_mailbox
.update(current_view, leader, View::zero(), None)
.await;
let wrong_view = current_view.next();
let leader_nullify = Nullify::sign::<Sha256Digest>(
&schemes[usize::from(leader)],
Round::new(epoch, wrong_view),
)
.expect("nullify");
leader_sender
.send(
Recipients::One(me),
Vote::<S, Sha256Digest>::Nullify(leader_nullify).encode(),
true,
)
.await
.unwrap();
let got_wrong_view_expire = select! {
message = voter_receiver.recv() => {
matches!(message, Some(voter::Message::Timeout(view, _)) if view == wrong_view)
},
_ = context.sleep(Duration::from_millis(100)) => false,
};
assert!(
!got_wrong_view_expire,
"must not fast-path timeout for a leader nullify in a non-current view"
);
});
}
#[test_traced]
fn test_leader_nullify_wrong_view_no_expire() {
leader_nullify_wrong_view_no_expire(bls12381_threshold_vrf::fixture::<MinPk, _>);
leader_nullify_wrong_view_no_expire(bls12381_threshold_vrf::fixture::<MinSig, _>);
leader_nullify_wrong_view_no_expire(bls12381_threshold_std::fixture::<MinPk, _>);
leader_nullify_wrong_view_no_expire(bls12381_threshold_std::fixture::<MinSig, _>);
leader_nullify_wrong_view_no_expire(bls12381_multisig::fixture::<MinPk, _>);
leader_nullify_wrong_view_no_expire(bls12381_multisig::fixture::<MinSig, _>);
leader_nullify_wrong_view_no_expire(ed25519::fixture);
leader_nullify_wrong_view_no_expire(secp256r1::fixture);
}
fn votes_skipped_for_finalized_views<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view1 = View::new(1);
let view2 = View::new(2);
let leader = Participant::new(1);
let nullify = batcher_mailbox
.update(view1, leader, View::zero(), None)
.await;
assert!(nullify.is_none());
let round1 = Round::new(epoch, view1);
let proposal1 = Proposal::new(round1, View::zero(), Sha256::hash(b"payload1"));
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal1.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_notarize = Notarize::sign(&schemes[0], proposal1.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_notarize))
.await;
loop {
let output = voter_receiver.recv().await.unwrap();
match output {
voter::Message::Proposal(_) => continue,
voter::Message::Verified(Certificate::Notarization(n), _) => {
assert_eq!(
n.view(),
view1,
"Should construct notarization for view above finalized"
);
break;
}
_ => panic!("Unexpected message type"),
}
}
let view3 = View::new(3);
let nullify = batcher_mailbox.update(view3, leader, view2, None).await;
assert!(nullify.is_none());
let round2 = Round::new(epoch, view2);
let proposal2 = Proposal::new(round2, view1, Sha256::hash(b"payload2"));
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal2.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_notarize2 = Notarize::sign(&schemes[0], proposal2.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_notarize2))
.await;
select! {
msg = voter_receiver.recv() => match msg {
Some(voter::Message::Proposal(_)) => {}
Some(voter::Message::Verified(cert, _)) if cert.view() == view2 => {
panic!("should not receive any certificate for the finalized view");
}
_ => {}
},
_ = context.sleep(Duration::from_millis(200)) => {},
};
});
}
#[test_traced]
fn test_votes_skipped_for_finalized_views() {
votes_skipped_for_finalized_views(bls12381_threshold_vrf::fixture::<MinPk, _>);
votes_skipped_for_finalized_views(bls12381_threshold_vrf::fixture::<MinSig, _>);
votes_skipped_for_finalized_views(bls12381_threshold_std::fixture::<MinPk, _>);
votes_skipped_for_finalized_views(bls12381_threshold_std::fixture::<MinSig, _>);
votes_skipped_for_finalized_views(bls12381_multisig::fixture::<MinPk, _>);
votes_skipped_for_finalized_views(bls12381_multisig::fixture::<MinSig, _>);
votes_skipped_for_finalized_views(ed25519::fixture);
votes_skipped_for_finalized_views(secp256r1::fixture);
}
fn latest_vote_metric_tracking<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(
context.clone(),
participants.clone(),
)
.await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_context = context.with_label("batcher");
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(batcher_context.clone(), batcher_cfg);
let buffer = batcher_context.encode();
for participant in &participants {
let expected = format!("latest_vote{{peer=\"{}\"}} 0", participant);
assert!(
buffer.contains(&expected),
"Expected metric for participant {} to be initialized to 0, got: {}",
participant,
buffer
);
}
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let leader = Participant::new(1);
let warmup_vote = Nullify::sign::<Sha256Digest>(
&schemes[usize::from(leader)],
Round::new(epoch, View::new(1)),
)
.unwrap();
if let Some(ref mut sender) = participant_senders[usize::from(leader)] {
sender
.send(
Recipients::One(me.clone()),
Vote::<S, Sha256Digest>::Nullify(warmup_vote).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(50)).await;
let view = View::new(5);
let nullify = batcher_mailbox.update(view, leader, View::zero(), None).await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_vote))
.await;
context.sleep(Duration::from_millis(100)).await;
loop {
let output = voter_receiver.recv().await.unwrap();
match output {
voter::Message::Proposal(_) => continue,
voter::Message::Verified(Certificate::Notarization(n), _) => {
assert_eq!(n.view(), view, "Should construct notarization");
break;
}
_ => panic!("Unexpected message type"),
}
}
let buffer = batcher_context.encode();
for (i, participant) in participants.iter().enumerate().take(quorum_size).skip(1) {
let expected = format!("latest_vote{{peer=\"{}\"}} 5", participant);
assert!(
buffer.contains(&expected),
"Expected participant {} to have latest_vote=5, got: {}",
i,
buffer
);
}
let late_voter = quorum_size;
let late_vote = Notarize::sign(&schemes[late_voter], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[late_voter] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(late_vote).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(100)).await;
let buffer = batcher_context.encode();
let expected_late = format!("latest_vote{{peer=\"{}\"}} 5", participants[late_voter]);
assert!(
buffer.contains(&expected_late),
"Expected late voter (participant {}) to have latest_vote=5 even after quorum, got: {}",
late_voter,
buffer
);
let view3 = View::new(3);
let round3 = Round::new(epoch, view3);
let proposal3 = Proposal::new(round3, View::zero(), Sha256::hash(b"payload3"));
let vote_v3 = Notarize::sign(&schemes[1], proposal3).unwrap();
if let Some(ref mut sender) = participant_senders[1] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote_v3).encode(),
true,
)
.await
.unwrap();
}
context.sleep(Duration::from_millis(100)).await;
let buffer = batcher_context.encode();
let expected_v5 = format!("latest_vote{{peer=\"{}\"}} 5", participants[1]);
assert!(
buffer.contains(&expected_v5),
"Expected participant 1 to still have latest_vote=5 after receiving lower view vote, got: {}",
buffer
);
});
}
#[test_traced]
fn test_latest_vote_metric_tracking() {
latest_vote_metric_tracking(bls12381_threshold_vrf::fixture::<MinPk, _>);
latest_vote_metric_tracking(bls12381_threshold_vrf::fixture::<MinSig, _>);
latest_vote_metric_tracking(bls12381_threshold_std::fixture::<MinPk, _>);
latest_vote_metric_tracking(bls12381_threshold_std::fixture::<MinSig, _>);
latest_vote_metric_tracking(bls12381_multisig::fixture::<MinPk, _>);
latest_vote_metric_tracking(bls12381_multisig::fixture::<MinSig, _>);
latest_vote_metric_tracking(ed25519::fixture);
latest_vote_metric_tracking(secp256r1::fixture);
}
fn duplicate_vote_with_different_attestation_blocks_peer<S, F, V>(mut fixture: F, sign_vote: V)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
V: Fn(&S, Proposal<Sha256Digest>) -> Vote<S, Sha256Digest> + Send + 'static,
{
let n = 5;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, _voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let sender_pk = participants[1].clone();
let (mut sender, _receiver) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(sender_pk.clone(), me.clone(), link)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let nullify = batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));
let vote1 = sign_vote(&schemes[1], proposal.clone());
sender
.send(Recipients::One(me.clone()), vote1.encode(), true)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
blocked.is_empty(),
"No peers should be blocked after first vote"
);
sender
.send(Recipients::One(me.clone()), vote1.encode(), true)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
blocked.is_empty(),
"Duplicate vote should be ignored, not blocked"
);
let vote2 = sign_vote(&schemes[2], proposal.clone());
sender
.send(Recipients::One(me.clone()), vote2.encode(), true)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
blocked.iter().any(|(_, blocked)| blocked == &sender_pk),
"Sender should be blocked for vote with mismatched signer"
);
});
}
fn sign_notarize<S: Scheme<Sha256Digest>>(
scheme: &S,
p: Proposal<Sha256Digest>,
) -> Vote<S, Sha256Digest> {
Vote::Notarize(Notarize::sign(scheme, p).unwrap())
}
fn sign_finalize<S: Scheme<Sha256Digest>>(
scheme: &S,
p: Proposal<Sha256Digest>,
) -> Vote<S, Sha256Digest> {
Vote::Finalize(Finalize::sign(scheme, p).unwrap())
}
#[test_traced]
fn test_duplicate_notarize_with_different_attestation_blocks_peer() {
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_vrf::fixture::<MinPk, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_vrf::fixture::<MinSig, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_std::fixture::<MinPk, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_std::fixture::<MinSig, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_multisig::fixture::<MinPk, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_multisig::fixture::<MinSig, _>,
sign_notarize,
);
duplicate_vote_with_different_attestation_blocks_peer(ed25519::fixture, sign_notarize);
duplicate_vote_with_different_attestation_blocks_peer(secp256r1::fixture, sign_notarize);
}
#[test_traced]
fn test_duplicate_finalize_with_different_attestation_blocks_peer() {
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_vrf::fixture::<MinPk, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_vrf::fixture::<MinSig, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_std::fixture::<MinPk, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_threshold_std::fixture::<MinSig, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_multisig::fixture::<MinPk, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(
bls12381_multisig::fixture::<MinSig, _>,
sign_finalize,
);
duplicate_vote_with_different_attestation_blocks_peer(ed25519::fixture, sign_finalize);
duplicate_vote_with_different_attestation_blocks_peer(secp256r1::fixture, sign_finalize);
}
fn conflicting_vote_creates_evidence<S, F, V, A>(
mut fixture: F,
sign_vote: V,
is_expected_activity: A,
) where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
V: Fn(&S, Proposal<Sha256Digest>) -> Vote<S, Sha256Digest> + Send + 'static,
A: Fn(&Activity<S, Sha256Digest>) -> bool + Send + 'static,
{
let n = 5;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);
let oracle = start_test_network_with_peers(context.clone(), participants.clone()).await;
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let me = participants[0].clone();
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
relay: MockRelay::new(),
strategy: Sequential,
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
forwarding: ForwardingPolicy::Disabled,
};
let (batcher, mut batcher_mailbox) = Actor::new(context.clone(), batcher_cfg);
let (voter_sender, _voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);
let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let sender_pk = participants[1].clone();
let (mut sender, _receiver) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(sender_pk.clone(), me.clone(), link)
.await
.unwrap();
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);
let view = View::new(1);
let nullify = batcher_mailbox
.update(view, Participant::new(1), View::zero(), None)
.await;
assert!(nullify.is_none());
let round = Round::new(epoch, view);
let proposal1 = Proposal::new(round, View::zero(), Sha256::hash(b"payload1"));
let proposal2 = Proposal::new(round, View::zero(), Sha256::hash(b"payload2"));
let vote1 = sign_vote(&schemes[1], proposal1);
sender
.send(Recipients::One(me.clone()), vote1.encode(), true)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
blocked.is_empty(),
"No peers should be blocked after first vote"
);
let vote2 = sign_vote(&schemes[1], proposal2);
sender
.send(Recipients::One(me.clone()), vote2.encode(), true)
.await
.unwrap();
context.sleep(Duration::from_millis(50)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
blocked.iter().any(|(_, blocked)| blocked == &sender_pk),
"Sender should be blocked for conflicting vote"
);
let faults = reporter.faults.lock();
let has_expected_fault = faults
.get(&sender_pk)
.and_then(|sf| sf.get(&view))
.is_some_and(|vf| vf.iter().any(&is_expected_activity));
assert!(has_expected_fault, "Should have conflicting fault reported");
});
}
fn is_conflicting_notarize<S: Scheme<Sha256Digest>>(a: &Activity<S, Sha256Digest>) -> bool {
matches!(a, Activity::ConflictingNotarize(_))
}
fn is_conflicting_finalize<S: Scheme<Sha256Digest>>(a: &Activity<S, Sha256Digest>) -> bool {
matches!(a, Activity::ConflictingFinalize(_))
}
#[test_traced]
fn test_conflicting_notarize_creates_evidence() {
conflicting_vote_creates_evidence(
bls12381_threshold_vrf::fixture::<MinPk, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_vrf::fixture::<MinSig, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_std::fixture::<MinPk, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_std::fixture::<MinSig, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(
bls12381_multisig::fixture::<MinPk, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(
bls12381_multisig::fixture::<MinSig, _>,
sign_notarize,
is_conflicting_notarize,
);
conflicting_vote_creates_evidence(ed25519::fixture, sign_notarize, is_conflicting_notarize);
conflicting_vote_creates_evidence(
secp256r1::fixture,
sign_notarize,
is_conflicting_notarize,
);
}
#[test_traced]
fn test_conflicting_finalize_creates_evidence() {
conflicting_vote_creates_evidence(
bls12381_threshold_vrf::fixture::<MinPk, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_vrf::fixture::<MinSig, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_std::fixture::<MinPk, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(
bls12381_threshold_std::fixture::<MinSig, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(
bls12381_multisig::fixture::<MinPk, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(
bls12381_multisig::fixture::<MinSig, _>,
sign_finalize,
is_conflicting_finalize,
);
conflicting_vote_creates_evidence(ed25519::fixture, sign_finalize, is_conflicting_finalize);
conflicting_vote_creates_evidence(
secp256r1::fixture,
sign_finalize,
is_conflicting_finalize,
);
}
}