use bpcon::config::BPConConfig;
use bpcon::error::LaunchBallotError;
use bpcon::leader::{DefaultLeaderElector, LeaderElector};
use bpcon::message::{Message1bContent, MessagePacket};
use bpcon::test_mocks::{MockParty, MockValue, MockValueSelector};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration, Instant};
use futures::future::join_all;
type PartiesWithChannels = (
Vec<MockParty>,
Vec<UnboundedReceiver<MessagePacket>>,
Vec<UnboundedSender<MessagePacket>>,
);
fn create_parties(cfg: BPConConfig) -> PartiesWithChannels {
(0..cfg.party_weights.len())
.map(|i| {
MockParty::new(
i as u64,
cfg.clone(),
MockValueSelector,
Box::new(DefaultLeaderElector::new()),
)
})
.fold(
(Vec::new(), Vec::new(), Vec::new()),
|(mut parties, mut receivers, mut senders), (p, r, s)| {
parties.push(p);
receivers.push(r);
senders.push(s);
(parties, receivers, senders)
},
)
}
fn launch_parties(
parties: Vec<MockParty>,
) -> Vec<JoinHandle<Result<MockValue, LaunchBallotError>>> {
parties
.into_iter()
.map(|mut party| tokio::spawn(async move { party.launch_ballot().await }))
.collect()
}
fn collect_messages(receivers: &mut [UnboundedReceiver<MessagePacket>]) -> Vec<MessagePacket> {
receivers
.iter_mut()
.filter_map(|receiver| receiver.try_recv().ok())
.collect()
}
fn broadcast_messages(messages: Vec<MessagePacket>, senders: &[UnboundedSender<MessagePacket>]) {
messages.iter().for_each(|msg| {
senders
.iter()
.enumerate()
.filter(|(i, _)| msg.routing.sender != *i as u64) .for_each(|(_, sender_into)| {
sender_into.send(msg.clone()).unwrap();
});
});
}
fn propagate_p2p(
mut receivers: Vec<UnboundedReceiver<MessagePacket>>,
senders: Vec<UnboundedSender<MessagePacket>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let messages = collect_messages(receivers.as_mut_slice());
broadcast_messages(messages, &senders);
sleep(Duration::from_millis(1)).await;
}
})
}
async fn await_results(
tasks: Vec<JoinHandle<Result<MockValue, LaunchBallotError>>>,
) -> Vec<Result<MockValue, LaunchBallotError>> {
join_all(tasks)
.await
.into_iter()
.map(|res| res.unwrap())
.collect()
}
fn analyze_ballot(results: Vec<Result<MockValue, LaunchBallotError>>) {
let (successful, errors): (Vec<_>, Vec<_>) = results.into_iter().partition(|res| res.is_ok());
if !errors.is_empty() {
for err in errors.into_iter() {
eprintln!("Error during ballot: {:?}", err.unwrap_err());
}
}
if successful.is_empty() {
panic!("No consensus, all parties failed.");
}
let values: Vec<MockValue> = successful.into_iter().map(|res| res.unwrap()).collect();
if let Some((first_value, rest)) = values.split_first() {
let all_agreed = rest.iter().all(|v| v == first_value);
assert!(
all_agreed,
"No consensus, different values found: {:?}",
values
);
println!("Consensus reached with value: {:?}", first_value);
}
}
async fn run_ballot_faulty_party(
parties: PartiesWithChannels,
faulty_ids: Vec<usize>,
) -> Vec<Result<MockValue, LaunchBallotError>> {
let (mut parties, mut receivers, mut senders) = parties;
for id in faulty_ids {
parties.remove(id);
receivers.remove(id);
senders.remove(id);
}
let ballot_tasks = launch_parties(parties);
let p2p_task = propagate_p2p(receivers, senders);
let results = await_results(ballot_tasks).await;
p2p_task.abort();
results
}
#[tokio::test]
async fn test_ballot_happy_case() {
let (parties, receivers, senders) = create_parties(BPConConfig::default());
let ballot_tasks = launch_parties(parties);
let p2p_task = propagate_p2p(receivers, senders);
let results = await_results(ballot_tasks).await;
p2p_task.abort();
analyze_ballot(results);
}
#[tokio::test]
async fn test_ballot_faulty_party_common() {
let parties = create_parties(BPConConfig::default());
let elector = DefaultLeaderElector::new();
let leader = elector.elect_leader(&parties.0[0]).unwrap();
let faulty_ids: Vec<usize> = vec![3];
for id in faulty_ids.iter() {
assert_ne!(
*id as u64, leader,
"Should not fail the leader for the test to pass"
);
}
let results = run_ballot_faulty_party(parties, faulty_ids).await;
analyze_ballot(results);
}
#[tokio::test]
async fn test_ballot_faulty_party_leader() {
let parties = create_parties(BPConConfig::default());
let elector = DefaultLeaderElector::new();
let leader = elector.elect_leader(&parties.0[0]).unwrap();
let faulty_ids = vec![leader as usize];
let results = run_ballot_faulty_party(parties, faulty_ids).await;
assert!(
results.into_iter().all(|res| res.is_err()),
"All parties should have failed having a faulty leader in the consensus."
);
}
#[tokio::test]
async fn test_ballot_malicious_party() {
let (parties, mut receivers, senders) = create_parties(BPConConfig::default());
let elector = DefaultLeaderElector::new();
let leader = elector.elect_leader(&parties[0]).unwrap();
const MALICIOUS_PARTY_ID: u64 = 2;
assert_ne!(
MALICIOUS_PARTY_ID, leader,
"Should not make malicious the leader for the test to pass"
);
let content = &Message1bContent {
ballot: parties[0].ballot() + 1, last_ballot_voted: Some(parties[0].ballot() + 1), last_value_voted: None,
};
let malicious_msg = content.pack(MALICIOUS_PARTY_ID).unwrap();
let ballot_tasks = launch_parties(parties);
let p2p_task = tokio::spawn(async move {
let mut last_malicious_message_time = Instant::now();
let malicious_message_interval = Duration::from_millis(100);
loop {
let mut messages: Vec<_> = receivers
.iter_mut()
.enumerate()
.filter_map(|(i, receiver)| {
(i != MALICIOUS_PARTY_ID as usize)
.then(|| receiver.try_recv().ok())
.flatten()
})
.collect();
if last_malicious_message_time.elapsed() >= malicious_message_interval {
messages.push(malicious_msg.clone());
last_malicious_message_time = Instant::now();
}
broadcast_messages(messages, &senders);
sleep(Duration::from_millis(100)).await;
}
});
let results = await_results(ballot_tasks).await;
p2p_task.abort();
analyze_ballot(results);
}
#[tokio::test]
#[ignore = "takes 20 secs to run, launch manually"]
async fn test_ballot_many_parties() {
const AMOUNT_OF_PARTIES: usize = 999;
let party_weights = vec![1; AMOUNT_OF_PARTIES];
let threshold = BPConConfig::compute_bft_threshold(party_weights.clone());
let cfg = BPConConfig {
party_weights,
threshold,
launch_at: Instant::now(),
launch1a_timeout: Duration::from_secs(0), launch1b_timeout: Duration::from_secs(1), launch2a_timeout: Duration::from_secs(5),
launch2av_timeout: Duration::from_secs(7),
launch2b_timeout: Duration::from_secs(12),
finalize_timeout: Duration::from_secs(19),
grace_period: Duration::from_secs(0),
};
let (parties, receivers, senders) = create_parties(cfg);
let ballot_tasks = launch_parties(parties);
let p2p_task = propagate_p2p(receivers, senders);
let results = await_results(ballot_tasks).await;
p2p_task.abort();
analyze_ballot(results);
}
#[tokio::test]
async fn test_ballot_max_weight() {
let weights = vec![u64::MAX, 1];
let threshold = BPConConfig::compute_bft_threshold(weights.clone());
let cfg = BPConConfig::with_default_timeouts(weights, threshold);
let (parties, receivers, senders) = create_parties(cfg);
let ballot_tasks = launch_parties(parties);
let p2p_task = propagate_p2p(receivers, senders);
let results = await_results(ballot_tasks).await;
p2p_task.abort();
analyze_ballot(results);
}
#[tokio::test]
async fn test_ballot_weights_underflow() {
let weights = vec![100, 1, 2, 3, 4];
let threshold = BPConConfig::compute_bft_threshold(weights.clone());
let cfg = BPConConfig::with_default_timeouts(weights, threshold);
let (parties, receivers, senders) = create_parties(cfg);
let ballot_tasks = launch_parties(parties);
let p2p_task = propagate_p2p(receivers, senders);
let results = await_results(ballot_tasks).await;
p2p_task.abort();
analyze_ballot(results);
}