use futures::prelude::*;
use futures::sync::mpsc::{self, UnboundedReceiver};
#[cfg(feature = "std")]
use log::trace;
use std::collections::VecDeque;
use std::sync::Arc;
use crate::round::State as RoundState;
use crate::{
CatchUp, Chain, Commit, CompactCommit, Equivocation, Message, Prevote, Precommit,
PrimaryPropose, SignedMessage, BlockNumberOps, validate_commit, CommitValidationResult,
HistoricalVotes,
};
use crate::voter_set::VoterSet;
use past_rounds::PastRounds;
use voting_round::{VotingRound, State as VotingRoundState};
mod past_rounds;
mod voting_round;
pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
type Timer: Future<Item=(),Error=Self::Error>;
type Id: Ord + Clone + Eq + ::std::fmt::Debug;
type Signature: Eq + Clone;
type In: Stream<Item=SignedMessage<H, N, Self::Signature, Self::Id>, Error=Self::Error>;
type Out: Sink<SinkItem=Message<H, N>, SinkError=Self::Error>;
type Error: From<crate::Error> + ::std::error::Error;
fn round_data(&self, round: u64) -> RoundData<
Self::Id,
Self::Timer,
Self::In,
Self::Out,
>;
fn round_commit_timer(&self) -> Self::Timer;
fn proposed(&self, round: u64, propose: PrimaryPropose<H, N>) -> Result<(), Self::Error>;
fn prevoted(&self, round: u64, prevote: Prevote<H, N>) -> Result<(), Self::Error>;
fn precommitted(&self, round: u64, precommit: Precommit<H, N>) -> Result<(), Self::Error>;
fn completed(
&self,
round: u64,
state: RoundState<H, N>,
base: (H, N),
votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
) -> Result<(), Self::Error>;
fn concluded(
&self,
round: u64,
state: RoundState<H, N>,
base: (H, N),
votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
) -> Result<(), Self::Error>;
fn finalize_block(&self, hash: H, number: N, round: u64, commit: Commit<H, N, Self::Signature, Self::Id>) -> Result<(), Self::Error>;
fn prevote_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Prevote<H, N>, Self::Signature>);
fn precommit_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Precommit<H, N>, Self::Signature>);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommunicationOut<H, N, S, Id> {
Commit(u64, Commit<H, N, S, Id>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommitProcessingOutcome {
Good(GoodCommit),
Bad(BadCommit),
}
#[cfg(any(test, feature = "test-helpers"))]
impl CommitProcessingOutcome {
pub fn good() -> CommitProcessingOutcome {
CommitProcessingOutcome::Good(GoodCommit::new())
}
pub fn bad() -> CommitProcessingOutcome {
CommitProcessingOutcome::Bad(CommitValidationResult::<(), ()>::default().into())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoodCommit {
_priv: (),
}
impl GoodCommit {
pub(crate) fn new() -> Self {
GoodCommit { _priv: () }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadCommit {
_priv: (),
num_precommits: usize,
num_duplicated_precommits: usize,
num_equivocations: usize,
num_invalid_voters: usize,
}
impl BadCommit {
pub fn num_precommits(&self) -> usize {
self.num_precommits
}
pub fn num_duplicated(&self) -> usize {
self.num_duplicated_precommits
}
pub fn num_equivocations(&self) -> usize {
self.num_equivocations
}
pub fn num_invalid_voters(&self) -> usize {
self.num_invalid_voters
}
}
impl<H, N> From<CommitValidationResult<H, N>> for BadCommit {
fn from(r: CommitValidationResult<H, N>) -> Self {
BadCommit {
num_precommits: r.num_precommits,
num_duplicated_precommits: r.num_duplicated_precommits,
num_equivocations: r.num_equivocations,
num_invalid_voters: r.num_invalid_voters,
_priv: (),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CatchUpProcessingOutcome {
Good(GoodCatchUp),
Bad(BadCatchUp),
Useless,
}
#[cfg(any(test, feature = "test-helpers"))]
impl CatchUpProcessingOutcome {
pub fn bad() -> CatchUpProcessingOutcome {
CatchUpProcessingOutcome::Bad(BadCatchUp::new())
}
pub fn good() -> CatchUpProcessingOutcome {
CatchUpProcessingOutcome::Good(GoodCatchUp::new())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoodCatchUp {
_priv: (),
}
impl GoodCatchUp {
pub(crate) fn new() -> Self {
GoodCatchUp { _priv: () }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadCatchUp {
_priv: (),
}
impl BadCatchUp {
pub(crate) fn new() -> Self {
BadCatchUp { _priv: () }
}
}
pub enum Callback<O> {
Blank,
Work(Box<dyn FnMut(O) + Send>),
}
#[cfg(test)]
impl<O> Clone for Callback<O> {
fn clone(&self) -> Self {
Callback::Blank
}
}
impl<O> Callback<O> {
pub fn run(&mut self, o: O) {
match self {
Callback::Blank => {},
Callback::Work(cb) => cb(o),
}
}
}
#[cfg_attr(test, derive(Clone))]
pub enum CommunicationIn<H, N, S, Id> {
Commit(u64, CompactCommit<H, N, S, Id>, Callback<CommitProcessingOutcome>),
CatchUp(CatchUp<H, N, S, Id>, Callback<CatchUpProcessingOutcome>),
}
pub struct RoundData<Id, Timer, Input, Output> {
pub voter_id: Option<Id>,
pub prevote_timer: Timer,
pub precommit_timer: Timer,
pub incoming: Input,
pub outgoing: Output,
}
struct Buffered<S: Sink> {
inner: S,
buffer: VecDeque<S::SinkItem>,
}
impl<S: Sink> Buffered<S> {
fn new(inner: S) -> Buffered<S> {
Buffered {
buffer: VecDeque::new(),
inner
}
}
fn push(&mut self, item: S::SinkItem) {
self.buffer.push_back(item);
}
fn poll(&mut self) -> Poll<(), S::SinkError> {
let polled = self.schedule_all()?;
match polled {
Async::Ready(()) => self.inner.poll_complete(),
Async::NotReady => {
self.inner.poll_complete()?;
Ok(Async::NotReady)
}
}
}
fn schedule_all(&mut self) -> Poll<(), S::SinkError> {
while let Some(front) = self.buffer.pop_front() {
match self.inner.start_send(front) {
Ok(AsyncSink::Ready) => continue,
Ok(AsyncSink::NotReady(front)) => {
self.buffer.push_front(front);
break;
}
Err(e) => return Err(e),
}
}
if self.buffer.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
type FinalizedNotification<H, N, E> = (
H,
N,
u64,
Commit<H, N, <E as Environment<H, N>>::Signature, <E as Environment<H, N>>::Id>,
);
fn instantiate_last_round<H, N, E: Environment<H, N>>(
voters: VoterSet<E::Id>,
last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
last_round_number: u64,
last_round_base: (H, N),
finalized_sender: mpsc::UnboundedSender<FinalizedNotification<H, N, E>>,
env: Arc<E>,
) -> Option<VotingRound<H, N, E>> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
{
let last_round_tracker = crate::round::Round::new(crate::round::RoundParams {
voters: voters,
base: last_round_base,
round_number: last_round_number,
});
let mut last_round = VotingRound::completed(
last_round_tracker,
finalized_sender,
None,
env,
);
for vote in last_round_votes {
last_round.handle_vote(vote).ok()?;
}
if last_round.round_state().completable {
Some(last_round)
} else {
None
}
}
pub struct Voter<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
env: Arc<E>,
voters: VoterSet<E::Id>,
best_round: VotingRound<H, N, E>,
past_rounds: PastRounds<H, N, E>,
finalized_notifications: UnboundedReceiver<FinalizedNotification<H, N, E>>,
last_finalized_number: N,
global_in: GlobalIn,
global_out: Buffered<GlobalOut>,
last_finalized_in_rounds: (H, N),
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
pub fn new(
env: Arc<E>,
voters: VoterSet<E::Id>,
global_comms: (GlobalIn, GlobalOut),
last_round_number: u64,
last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
last_round_base: (H, N),
last_finalized: (H, N),
) -> Self {
let (finalized_sender, finalized_notifications) = mpsc::unbounded();
let last_finalized_number = last_finalized.1;
let mut past_rounds = PastRounds::new();
let mut last_round_state = crate::bridge_state::bridge_state(RoundState::genesis(last_round_base.clone())).1;
if last_round_number > 0 {
let maybe_completed_last_round = instantiate_last_round(
voters.clone(),
last_round_votes,
last_round_number,
last_round_base,
finalized_sender.clone(),
env.clone(),
);
if let Some(mut last_round) = maybe_completed_last_round {
last_round_state = last_round.bridge_state();
past_rounds.push(&*env, last_round);
}
}
let best_round = VotingRound::new(
last_round_number + 1,
voters.clone(),
last_finalized.clone(),
Some(last_round_state),
finalized_sender,
env.clone(),
);
let (global_in, global_out) = global_comms;
Voter {
env,
voters,
best_round,
past_rounds,
finalized_notifications,
last_finalized_number,
last_finalized_in_rounds: last_finalized,
global_in,
global_out: Buffered::new(global_out),
}
}
fn prune_background_rounds(&mut self) -> Result<(), E::Error> {
while let Async::Ready(Some((number, commit))) = self.past_rounds.poll()? {
self.global_out.push(CommunicationOut::Commit(number, commit));
}
while let Async::Ready(res) = self.finalized_notifications.poll()
.expect("unbounded receivers do not have spurious errors; qed")
{
let (f_hash, f_num, round, commit) =
res.expect("one sender always kept alive in self.best_round; qed");
self.past_rounds.update_finalized(f_num);
if self.set_last_finalized_number(f_num) {
self.env.finalize_block(f_hash.clone(), f_num, round, commit)?;
}
if f_num > self.last_finalized_in_rounds.1 {
self.last_finalized_in_rounds = (f_hash, f_num);
}
}
Ok(())
}
fn process_incoming(&mut self) -> Result<(), E::Error> {
while let Async::Ready(Some(item)) = self.global_in.poll()? {
match item {
CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => {
trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}",
round_number,
commit.target_number,
commit.target_hash,
);
let commit: Commit<_, _, _, _> = commit.into();
if let Some(commit) = self.past_rounds.import_commit(round_number, commit) {
let validation_result = validate_commit(&commit, &self.voters, &*self.env)?;
if let Some((finalized_hash, finalized_number)) = validation_result.ghost {
let last_finalized_number = &mut self.last_finalized_number;
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number;
self.env.finalize_block(finalized_hash, finalized_number, round_number, commit)?;
}
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
} else {
process_commit_outcome.run(
CommitProcessingOutcome::Bad(BadCommit::from(validation_result)),
);
}
} else {
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
}
}
CommunicationIn::CatchUp(catch_up, mut process_catch_up_outcome) => {
trace!(target: "afg", "Got catch-up message for round {}", catch_up.round_number);
let round = if let Some(round) = validate_catch_up(
catch_up,
&*self.env,
&self.voters,
self.best_round.round_number(),
) {
round
} else {
process_catch_up_outcome.run(CatchUpProcessingOutcome::Bad(BadCatchUp::new()));
return Ok(());
};
let state = round.state();
let mut just_completed = VotingRound::completed(
round,
self.best_round.finalized_sender(),
None,
self.env.clone(),
);
let new_best = VotingRound::new(
just_completed.round_number() + 1,
self.voters.clone(),
self.last_finalized_in_rounds.clone(),
Some(just_completed.bridge_state()),
self.best_round.finalized_sender(),
self.env.clone(),
);
if let Some((f_hash, f_num)) = state.finalized.clone() {
if f_num > self.last_finalized_in_rounds.1 {
self.last_finalized_in_rounds = (f_hash, f_num);
}
}
self.env.completed(
just_completed.round_number(),
just_completed.round_state(),
just_completed.dag_base(),
just_completed.historical_votes(),
)?;
self.past_rounds.push(&*self.env, just_completed);
self.past_rounds.push(
&*self.env,
std::mem::replace(&mut self.best_round, new_best),
);
process_catch_up_outcome.run(CatchUpProcessingOutcome::Good(GoodCatchUp::new()));
},
}
}
Ok(())
}
fn process_best_round(&mut self) -> Poll<(), E::Error> {
let should_start_next = {
let completable = match self.best_round.poll()? {
Async::Ready(()) => true,
Async::NotReady => false,
};
let precommitted = match self.best_round.state() {
Some(&VotingRoundState::Precommitted) => true,
_ => false,
};
completable && precommitted
};
if !should_start_next { return Ok(Async::NotReady) }
trace!(target: "afg", "Best round at {} has become completable. Starting new best round at {}",
self.best_round.round_number(),
self.best_round.round_number() + 1,
);
self.completed_best_round()?;
self.poll()
}
fn completed_best_round(&mut self) -> Result<(), E::Error> {
self.env.completed(
self.best_round.round_number(),
self.best_round.round_state(),
self.best_round.dag_base(),
self.best_round.historical_votes(),
)?;
let old_round_number = self.best_round.round_number();
let next_round = VotingRound::new(
old_round_number + 1,
self.voters.clone(),
self.last_finalized_in_rounds.clone(),
Some(self.best_round.bridge_state()),
self.best_round.finalized_sender(),
self.env.clone(),
);
let old_round = ::std::mem::replace(&mut self.best_round, next_round);
self.past_rounds.push(&*self.env, old_round);
Ok(())
}
fn set_last_finalized_number(&mut self, finalized_number: N) -> bool {
let last_finalized_number = &mut self.last_finalized_number;
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number;
return true;
}
false
}
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Future for Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
type Item = ();
type Error = E::Error;
fn poll(&mut self) -> Poll<(), E::Error> {
self.process_incoming()?;
self.prune_background_rounds()?;
self.global_out.poll()?;
self.process_best_round()
}
}
fn validate_catch_up<H, N, S, I, E>(
catch_up: CatchUp<H, N, S, I>,
env: &E,
voters: &VoterSet<I>,
best_round_number: u64,
) -> Option<crate::round::Round<I, H, N, S>> where
H: Clone + Eq + Ord + std::fmt::Debug,
N: BlockNumberOps + std::fmt::Debug,
S: Clone + Eq,
I: Clone + Eq + std::fmt::Debug + Ord,
E: Environment<H, N>,
{
if catch_up.round_number <= best_round_number {
trace!(target: "afg", "Ignoring because best round number is {}",
best_round_number);
return None;
}
{
let mut map = std::collections::BTreeMap::new();
for prevote in &catch_up.prevotes {
if !voters.contains_key(&prevote.id) {
trace!(target: "afg",
"Ignoring invalid catch up, invalid voter: {:?}",
prevote.id,
);
return None;
}
map.entry(prevote.id.clone()).or_insert((false, false)).0 = true;
}
for precommit in &catch_up.precommits {
if !voters.contains_key(&precommit.id) {
trace!(target: "afg",
"Ignoring invalid catch up, invalid voter: {:?}",
precommit.id,
);
return None;
}
map.entry(precommit.id.clone()).or_insert((false, false)).1 = true;
}
let (pv, pc) = map.into_iter().fold(
(0, 0),
|(mut pv, mut pc), (id, (prevoted, precommitted))| {
let weight = voters.info(&id).map_or(0, |i| i.weight());
if prevoted {
pv += weight;
}
if precommitted {
pc += weight;
}
(pv, pc)
},
);
let threshold = voters.threshold();
if pv < threshold || pc < threshold {
trace!(target: "afg",
"Ignoring invalid catch up, missing voter threshold"
);
return None;
}
}
let mut round = crate::round::Round::new(crate::round::RoundParams {
round_number: catch_up.round_number,
voters: voters.clone(),
base: (catch_up.base_hash.clone(), catch_up.base_number),
});
for crate::SignedPrevote { prevote, id, signature } in catch_up.prevotes {
match round.import_prevote(env, prevote, id, signature) {
Ok(_) => {},
Err(e) => {
trace!(target: "afg",
"Ignoring invalid catch up, error importing prevote: {:?}",
e,
);
return None;
},
}
}
for crate::SignedPrecommit { precommit, id, signature } in catch_up.precommits {
match round.import_precommit(env, precommit, id, signature) {
Ok(_) => {},
Err(e) => {
trace!(target: "afg",
"Ignoring invalid catch up, error importing precommit: {:?}",
e,
);
return None;
},
}
}
let state = round.state();
if !state.completable {
return None;
}
Some(round)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SignedPrecommit;
use crate::testing::{
self,
chain::GENESIS_HASH,
environment::{Environment, Id, Signature},
};
use std::time::Duration;
use tokio::prelude::FutureExt;
use tokio::runtime::current_thread;
#[test]
fn talking_to_myself() {
let local_id = Id(5);
let voters = std::iter::once((local_id, 100)).collect();
let (network, routing_task) = testing::environment::make_network();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters,
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
tokio::spawn(exit.until(routing_task).map(|_| ()));
finalized
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
.map(|_| signal.fire())
})).unwrap();
}
#[test]
fn finalizing_at_fault_threshold() {
let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect();
let (network, routing_task) = testing::environment::make_network();
let (signal, exit) = ::exit_future::signal();
current_thread::block_on_all(::futures::future::lazy(move || {
tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
let finalized_streams = (0..7).map(move |i| {
let local_id = Id(i);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
finalized
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
});
::futures::future::join_all(finalized_streams).map(|_| signal.fire())
})).unwrap();
}
#[test]
fn broadcast_commit() {
let local_id = Id(5);
let voters: VoterSet<_> = std::iter::once((local_id, 100)).collect();
let (network, routing_task) = testing::environment::make_network();
let (commits, _) = network.make_global_comms();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
tokio::spawn(exit.until(routing_task).map(|_| ()));
commits.take(1).for_each(|_| Ok(())).map(|_| signal.fire())
})).unwrap();
}
#[test]
fn broadcast_commit_only_if_newer() {
let local_id = Id(5);
let test_id = Id(42);
let voters: VoterSet<_> = [
(local_id, 100),
(test_id, 201),
].iter().cloned().collect();
let (network, routing_task) = testing::environment::make_network();
let (commits_stream, commits_sink) = network.make_global_comms();
let (round_stream, round_sink) = network.make_round_comms(1, test_id);
let prevote = Message::Prevote(Prevote {
target_hash: "E",
target_number: 6,
});
let precommit = Message::Precommit(Precommit {
target_hash: "E",
target_number: 6,
});
let commit = (1, Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: Signature(test_id.0),
id: test_id
}],
});
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|e| panic!("Error voting: {:?}", e))).map(|_| ()));
tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
tokio::spawn(exit.until(::futures::future::lazy(|| {
round_stream.into_future().map_err(|(e, _)| e)
.and_then(|(value, stream)| {
assert!(match value {
Some(SignedMessage { message: Message::Prevote(_), id: Id(5), .. }) => true,
_ => false,
});
let votes = vec![prevote, precommit].into_iter().map(Result::Ok);
round_sink.send_all(futures::stream::iter_result(votes)).map(|_| stream)
})
.and_then(|stream| {
stream.take_while(|value| match value {
SignedMessage { message: Message::Precommit(_), id: Id(5), .. } => Ok(false),
_ => Ok(true),
}).for_each(|_| Ok(()))
})
.and_then(|_| {
commits_sink.send(CommunicationOut::Commit(commit.0, commit.1))
})
.map_err(|_| ())
})).map(|_| ()));
commits_stream.into_future().map_err(|_| ())
.and_then(|(_, stream)| {
stream.take(1).for_each(|_| Ok(()))
.timeout(Duration::from_millis(500)).map_err(|_| ())
})
.then(|res| {
assert!(res.is_err());
signal.fire();
futures::future::ok::<(), ()>(())
})
})).unwrap();
}
#[test]
fn import_commit_for_any_round() {
let local_id = Id(5);
let test_id = Id(42);
let voters: VoterSet<_> = [
(local_id, 100),
(test_id, 201),
].iter().cloned().collect();
let (network, routing_task) = testing::environment::make_network();
let (_, commits_sink) = network.make_global_comms();
let (signal, exit) = ::exit_future::signal();
let commit = (0, Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: Signature(test_id.0),
id: test_id
}],
});
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
1,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
tokio::spawn(exit.until(routing_task).map(|_| ()));
tokio::spawn(commits_sink.send(CommunicationOut::Commit(commit.0, commit.1))
.map_err(|_| ()).map(|_| ()));
env.finalized_stream()
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
.map(|_| signal.fire())
})).unwrap();
}
#[test]
fn skips_to_latest_round_after_catch_up() {
let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect();
let (network, routing_task) = testing::environment::make_network();
let (signal, exit) = ::exit_future::signal();
current_thread::block_on_all(::futures::future::lazy(move || {
tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
let mut unsynced_voter = {
let local_id = Id(4);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
Vec::new(),
last_finalized,
last_finalized,
)
};
let pv = |id| crate::SignedPrevote {
prevote: crate::Prevote { target_hash: "C", target_number: 4 },
id: Id(id),
signature: Signature(99),
};
let pc = |id| crate::SignedPrecommit {
precommit: crate::Precommit { target_hash: "C", target_number: 4 },
id: Id(id),
signature: Signature(99),
};
network.send_message(CommunicationIn::CatchUp(
CatchUp {
base_number: 1,
base_hash: GENESIS_HASH,
round_number: 5,
prevotes: vec![pv(0), pv(1), pv(2)],
precommits: vec![pc(0), pc(1), pc(2)],
},
Callback::Blank,
));
::futures::future::poll_fn(move || -> Poll<(), ()> {
let poll = unsynced_voter.poll().map_err(|_| ())?;
if unsynced_voter.best_round.round_number() == 6 {
Ok(Async::Ready(()))
} else {
Ok(poll)
}
}).map(move |_| signal.fire())
})).unwrap();
}
#[test]
fn pick_up_from_prior_without_grandparent_state() {
let local_id = Id(5);
let voters = std::iter::once((local_id, 100)).collect();
let (network, routing_task) = testing::environment::make_network();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters,
global_comms,
10,
Vec::new(),
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
tokio::spawn(exit.until(routing_task).map(|_| ()));
finalized
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
.map(|_| signal.fire())
})).unwrap();
}
#[test]
fn pick_up_from_prior_with_grandparent_state() {
let local_id = Id(99);
let voters = (0..100).map(|id| (Id(id), 1)).collect::<VoterSet<_>>();
let (network, routing_task) = testing::environment::make_network();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network.clone(), local_id));
let outer_env = env.clone();
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let mut last_round_votes = Vec::new();
{
for id in 0..67 {
let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
let precommit = if id < 66 {
Message::Precommit(Precommit { target_hash: "D", target_number: 5 })
} else {
Message::Precommit(Precommit { target_hash: "E", target_number: 6 })
};
last_round_votes.push(SignedMessage {
message: prevote.clone(),
signature: Signature(id),
id: Id(id),
});
last_round_votes.push(SignedMessage {
message: precommit.clone(),
signature: Signature(id),
id: Id(id),
});
let (_, round_sink) = network.make_round_comms(2, Id(id));
tokio::spawn(
round_sink.send(prevote).and_then(move |sink| sink.send(precommit))
.map_err(|_| ())
.map(|_| ())
);
}
}
{
let sender = Id(67);
let (_, round_sink) = network.make_round_comms(1, sender);
let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 });
tokio::spawn(round_sink.send(last_precommit).map(|_| ()).map_err(|_| ()));
}
let voter = Voter::new(
env.clone(),
voters,
global_comms,
1,
last_round_votes,
last_finalized,
last_finalized,
);
tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
tokio::spawn(exit.until(routing_task).map(|_| ()));
let (round_stream, _) = network.make_round_comms(3, Id(1000));
round_stream
.skip_while(move |v| if let Message::Prevote(_) = v.message {
Ok(v.id != local_id)
} else {
Ok(true)
})
.into_future()
.map(move |(x, _stream)| { signal.fire(); x })
.map_err(|(err, _stream)| err)
})).unwrap();
assert_eq!(outer_env.last_completed_and_concluded(), (2, 1));
}
}