use futures::prelude::*;
use futures::sync::mpsc::{self, UnboundedReceiver};
use std::collections::VecDeque;
use std::cmp;
use std::hash::Hash;
use std::sync::Arc;
use crate::round::State as RoundState;
use crate::{
Chain, Commit, CompactCommit, Equivocation, Message, Prevote, Precommit, PrimaryPropose,
SignedMessage, BlockNumberOps, validate_commit, CommitValidationResult
};
use crate::voter_set::VoterSet;
use past_rounds::PastRounds;
use voting_round::{VotingRound, State as VotingRoundState};
mod past_rounds;
mod voting_round;
pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
type Timer: Future<Item=(),Error=Self::Error>;
type Id: Hash + Clone + Eq + ::std::fmt::Debug;
type Signature: Eq + Clone;
type In: Stream<Item=SignedMessage<H, N, Self::Signature, Self::Id>,Error=Self::Error>;
type Out: Sink<SinkItem=Message<H, N>,SinkError=Self::Error>;
type Error: From<crate::Error> + ::std::error::Error;
fn round_data(&self, round: u64) -> RoundData<
Self::Id,
Self::Timer,
Self::In,
Self::Out,
>;
fn round_commit_timer(&self) -> Self::Timer;
fn proposed(&self, round: u64, propose: PrimaryPropose<H, N>) -> Result<(), Self::Error>;
fn prevoted(&self, round: u64, prevote: Prevote<H, N>) -> Result<(), Self::Error>;
fn precommitted(&self, round: u64, precommit: Precommit<H, N>) -> Result<(), Self::Error>;
fn completed(
&self,
round: u64,
state: RoundState<H, N>,
base: (H, N),
votes: Vec<SignedMessage<H, N, Self::Signature, Self::Id>>,
) -> Result<(), Self::Error>;
fn finalize_block(&self, hash: H, number: N, round: u64, commit: Commit<H, N, Self::Signature, Self::Id>) -> Result<(), Self::Error>;
fn prevote_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Prevote<H, N>, Self::Signature>);
fn precommit_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Precommit<H, N>, Self::Signature>);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommunicationOut<H, N, S, Id> {
Commit(u64, Commit<H, N, S, Id>),
Auxiliary(AuxiliaryCommunication<H, N, Id>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommitProcessingOutcome {
Good(GoodCommit),
Bad(BadCommit),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoodCommit {
_priv: (),
}
impl GoodCommit {
pub(crate) fn new() -> Self {
GoodCommit { _priv: () }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadCommit {
_priv: (),
num_precommits: usize,
num_duplicated_precommits: usize,
num_equivocations: usize,
num_invalid_voters: usize,
}
impl BadCommit {
pub fn num_precommits(&self) -> usize {
self.num_precommits
}
pub fn num_duplicated(&self) -> usize {
self.num_duplicated_precommits
}
pub fn num_equivocations(&self) -> usize {
self.num_equivocations
}
pub fn num_invalid_voters(&self) -> usize {
self.num_invalid_voters
}
}
impl<H, N> From<CommitValidationResult<H, N>> for BadCommit {
fn from(r: CommitValidationResult<H, N>) -> Self {
BadCommit {
num_precommits: r.num_precommits,
num_duplicated_precommits: r.num_duplicated_precommits,
num_equivocations: r.num_equivocations,
num_invalid_voters: r.num_invalid_voters,
_priv: (),
}
}
}
pub enum Callback {
Blank,
Work(Box<FnMut(CommitProcessingOutcome) + Send>),
}
#[cfg(test)]
impl Clone for Callback {
fn clone(&self) -> Self {
Callback::Blank
}
}
impl Callback {
pub(crate) fn run(&mut self, o: CommitProcessingOutcome) {
match self {
Callback::Blank => {},
Callback::Work(cb) => cb(o),
}
}
}
#[cfg_attr(test, derive(Clone))]
pub enum CommunicationIn<H, N, S, Id> {
Commit(u64, CompactCommit<H, N, S, Id>, Callback),
Auxiliary(AuxiliaryCommunication<H, N, Id>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "derive-codec", derive(Encode, Decode))]
pub enum AuxiliaryCommunication<H, N, Id> {
#[cfg_attr(feature = "derive-codec", codec(index = "0"))]
CatchUpRequest(CatchUpRequest<Id>),
#[cfg_attr(feature = "derive-codec", codec(index = "1"))]
CatchUp(CatchUp<H, N>)
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "derive-codec", derive(Encode, Decode))]
pub struct CatchUpRequest<Id> {
pub from: Id,
pub current_round: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "derive-codec", derive(Encode, Decode))]
pub struct CatchUp<H, N> {
pub prevotes: Vec<Prevote<H, N>>,
pub precommits: Vec<Precommit<H, N>>,
}
pub struct RoundData<Id, Timer, Input, Output> {
pub voter_id: Option<Id>,
pub prevote_timer: Timer,
pub precommit_timer: Timer,
pub incoming: Input,
pub outgoing: Output,
}
struct Buffered<S: Sink> {
inner: S,
buffer: VecDeque<S::SinkItem>,
}
impl<S: Sink> Buffered<S> {
fn new(inner: S) -> Buffered<S> {
Buffered {
buffer: VecDeque::new(),
inner
}
}
fn push(&mut self, item: S::SinkItem) {
self.buffer.push_back(item);
}
fn poll(&mut self) -> Poll<(), S::SinkError> {
let polled = self.schedule_all()?;
match polled {
Async::Ready(()) => self.inner.poll_complete(),
Async::NotReady => {
self.inner.poll_complete()?;
Ok(Async::NotReady)
}
}
}
fn schedule_all(&mut self) -> Poll<(), S::SinkError> {
while let Some(front) = self.buffer.pop_front() {
match self.inner.start_send(front) {
Ok(AsyncSink::Ready) => continue,
Ok(AsyncSink::NotReady(front)) => {
self.buffer.push_front(front);
break;
}
Err(e) => return Err(e),
}
}
if self.buffer.is_empty() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
pub struct Voter<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> where
H: Hash + Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
env: Arc<E>,
voters: VoterSet<E::Id>,
best_round: VotingRound<H, N, E>,
past_rounds: PastRounds<H, N, E>,
finalized_notifications: UnboundedReceiver<(H, N, u64, Commit<H, N, E::Signature, E::Id>)>,
last_finalized_number: N,
prospective_round: Option<VotingRound<H, N, E>>,
global_in: GlobalIn,
global_out: Buffered<GlobalOut>,
last_finalized_in_rounds: (H, N),
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut> where
H: Hash + Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
pub fn new(
env: Arc<E>,
voters: VoterSet<E::Id>,
global_comms: (GlobalIn, GlobalOut),
last_round_number: u64,
last_round_state: RoundState<H, N>,
last_finalized: (H, N),
) -> Self {
let (finalized_sender, finalized_notifications) = mpsc::unbounded();
let last_finalized_number = last_finalized.1.clone();
let (_, last_round_state) = crate::bridge_state::bridge_state(last_round_state);
let best_round = VotingRound::new(
last_round_number + 1,
voters.clone(),
last_finalized.clone(),
Some(last_round_state),
finalized_sender,
env.clone(),
);
let (global_in, global_out) = global_comms;
Voter {
env,
voters,
best_round,
past_rounds: PastRounds::new(),
prospective_round: None,
finalized_notifications,
last_finalized_number,
last_finalized_in_rounds: last_finalized,
global_in,
global_out: Buffered::new(global_out),
}
}
fn prune_background_rounds(&mut self) -> Result<(), E::Error> {
while let Async::Ready(Some((number, commit))) = self.past_rounds.poll()? {
self.global_out.push(CommunicationOut::Commit(number, commit));
}
while let Async::Ready(res) = self.finalized_notifications.poll()
.expect("unbounded receivers do not have spurious errors; qed")
{
let (f_hash, f_num, round, commit) =
res.expect("one sender always kept alive in self.best_round; qed");
self.past_rounds.update_finalized(f_num);
if self.set_last_finalized_number(f_num.clone()) {
self.env.finalize_block(f_hash.clone(), f_num.clone(), round, commit)?;
}
if f_num > self.last_finalized_in_rounds.1 {
self.last_finalized_in_rounds = (f_hash, f_num);
}
}
Ok(())
}
fn process_incoming(&mut self) -> Result<(), E::Error> {
let mut highest_incoming_foreign_commit = None;
while let Async::Ready(Some(item)) = self.global_in.poll()? {
match item {
CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => {
trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}",
round_number,
commit.target_number,
commit.target_hash,
);
let commit: Commit<_, _, _, _> = commit.into();
if let Some(commit) = self.past_rounds.import_commit(round_number, commit) {
let validation_result = validate_commit(&commit, &self.voters, &*self.env)?;
if let Some((finalized_hash, finalized_number)) = validation_result.ghost {
highest_incoming_foreign_commit = Some(highest_incoming_foreign_commit
.map_or(round_number, |n| cmp::max(n, round_number)));
let last_finalized_number = &mut self.last_finalized_number;
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number.clone();
self.env.finalize_block(finalized_hash, finalized_number, round_number, commit)?;
}
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
} else {
process_commit_outcome.run(
CommitProcessingOutcome::Bad(BadCommit::from(validation_result)),
);
}
} else {
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
}
}
CommunicationIn::Auxiliary(_aux) => {},
}
}
if let Some(round_number) = highest_incoming_foreign_commit {
self.maybe_start_prospective_round(round_number)?;
}
Ok(())
}
fn maybe_start_prospective_round(&mut self, round_number: u64) -> Result<(), E::Error> {
let prospective_round_number =
self.prospective_round.as_ref().map(|r| r.round_number());
let should_start_prospective = round_number > self.best_round.round_number() + 1 &&
prospective_round_number.map_or(true, |n| round_number > n);
if should_start_prospective {
trace!(target: "afg", "Imported commit for later round than current best {}, starting prospective round at {}",
self.best_round.round_number(),
round_number + 1,
);
let ghost_base = self.prospective_round.as_ref()
.and_then(|r| r.round_state().finalized.clone())
.or_else(|| self.best_round.round_state().finalized.clone())
.unwrap_or_else(|| self.last_finalized_in_rounds.clone());
self.prospective_round = Some(VotingRound::new(
round_number + 1,
self.voters.clone(),
ghost_base,
None,
self.best_round.finalized_sender(),
self.env.clone(),
));
}
Ok(())
}
fn process_prospective_round(&mut self) -> Poll<Option<bool>, E::Error> {
let mut best_round_completable = None;
if let Some(mut prospective_round) = self.prospective_round.take() {
match prospective_round.poll() {
Ok(Async::Ready(())) => {
trace!(target: "afg", "Prospective round at {} has become completable. Starting best round at {}",
prospective_round.round_number(),
prospective_round.round_number() + 1);
self.completed_prospective_round(prospective_round)?;
return Ok(Async::NotReady);
},
Ok(_) => {
assert!(self.best_round.round_number() < prospective_round.round_number());
if let Async::Ready(()) = self.best_round.poll()? {
if self.best_round.round_number() == prospective_round.round_number() - 1 {
trace!(target: "afg", "Best round at {} has caught up with prospective round at {}. \
Setting best round to prospective round.",
self.best_round.round_number(),
prospective_round.round_number());
prospective_round.bridge_state_from(&mut self.best_round);
self.completed_best_round(Some(prospective_round))?;
return Ok(Async::NotReady);
}
best_round_completable = Some(true);
} else {
self.prospective_round = Some(prospective_round);
best_round_completable = Some(false);
}
},
Err(e) => {
trace!(target: "afg", "Prospective round at {} has failed with: {}.",
prospective_round.round_number(),
e,
);
}
}
}
Ok(Async::Ready(best_round_completable))
}
fn process_best_round(&mut self, best_round_completable: Option<bool>) -> Poll<(), E::Error> {
let should_start_next = {
let completable = match best_round_completable {
Some(true) => true,
Some(false) => false,
None => match self.best_round.poll()? {
Async::Ready(()) => true,
Async::NotReady => false,
},
};
let precommitted = match self.best_round.state() {
Some(&VotingRoundState::Precommitted) => true,
_ => false,
};
completable && precommitted
};
if !should_start_next { return Ok(Async::NotReady) }
trace!(target: "afg", "Best round at {} has become completable. Starting new best round at {}",
self.best_round.round_number(),
self.best_round.round_number() + 1,
);
self.completed_best_round(None)?;
self.poll()
}
fn completed_best_round(&mut self, next_round: Option<VotingRound<H, N, E>>) -> Result<(), E::Error> {
self.env.completed(
self.best_round.round_number(),
self.best_round.round_state(),
self.best_round.dag_base(),
self.best_round.votes(),
)?;
let old_round_number = self.best_round.round_number();
let next_round = next_round.unwrap_or_else(||
VotingRound::new(
old_round_number + 1,
self.voters.clone(),
self.last_finalized_in_rounds.clone(),
Some(self.best_round.bridge_state()),
self.best_round.finalized_sender(),
self.env.clone(),
)
);
let old_round = ::std::mem::replace(&mut self.best_round, next_round);
self.past_rounds.push(&*self.env, old_round);
Ok(())
}
fn completed_prospective_round(&mut self, mut prospective_round: VotingRound<H, N, E>)
-> Result<(), E::Error>
{
self.env.completed(
prospective_round.round_number(),
prospective_round.round_state(),
prospective_round.dag_base(),
prospective_round.votes(),
)?;
self.best_round = VotingRound::new(
prospective_round.round_number() + 1,
self.voters.clone(),
prospective_round.dag_base(),
Some(prospective_round.bridge_state()),
self.best_round.finalized_sender(),
self.env.clone(),
);
self.past_rounds.push(&*self.env, prospective_round);
Ok(())
}
fn set_last_finalized_number(&mut self, finalized_number: N) -> bool {
let last_finalized_number = &mut self.last_finalized_number;
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number;
return true;
}
false
}
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Future for Voter<H, N, E, GlobalIn, GlobalOut> where
H: Hash + Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=CommunicationIn<H, N, E::Signature, E::Id>, Error=E::Error>,
GlobalOut: Sink<SinkItem=CommunicationOut<H, N, E::Signature, E::Id>, SinkError=E::Error>,
{
type Item = ();
type Error = E::Error;
fn poll(&mut self) -> Poll<(), E::Error> {
self.process_incoming()?;
self.prune_background_rounds()?;
self.global_out.poll()?;
match self.process_prospective_round()? {
Async::Ready(best_round_completable) => self.process_best_round(best_round_completable),
Async::NotReady => self.poll(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::prelude::FutureExt;
use tokio::runtime::current_thread;
use crate::SignedPrecommit;
use crate::testing::{self, GENESIS_HASH, Environment, Id};
use std::time::Duration;
#[test]
fn talking_to_myself() {
let local_id = Id(5);
let voters = std::iter::once((local_id, 100)).collect();
let (network, routing_task) = testing::make_network();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters,
global_comms,
0,
last_round_state,
last_finalized,
);
::tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
::tokio::spawn(exit.until(routing_task).map(|_| ()));
finalized
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
.map(|_| signal.fire())
})).unwrap();
}
#[test]
fn finalizing_at_fault_threshold() {
let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect();
let (network, routing_task) = testing::make_network();
let (signal, exit) = ::exit_future::signal();
current_thread::block_on_all(::futures::future::lazy(move || {
::tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
let finalized_streams = (0..7).map(move |i| {
let local_id = Id(i);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
last_round_state,
last_finalized,
);
::tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
finalized
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
});
::futures::future::join_all(finalized_streams).map(|_| signal.fire())
})).unwrap();
}
#[test]
fn broadcast_commit() {
let local_id = Id(5);
let voters: VoterSet<_> = std::iter::once((local_id, 100)).collect();
let (network, routing_task) = testing::make_network();
let (commits, _) = network.make_global_comms();
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
last_round_state,
last_finalized,
);
::tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
::tokio::spawn(exit.until(routing_task).map(|_| ()));
commits.take(1).for_each(|_| Ok(())).map(|_| signal.fire())
})).unwrap();
}
#[test]
fn broadcast_commit_only_if_newer() {
let local_id = Id(5);
let test_id = Id(42);
let voters: VoterSet<_> = [
(local_id, 100),
(test_id, 201),
].iter().cloned().collect();
let (network, routing_task) = testing::make_network();
let (commits_stream, commits_sink) = network.make_global_comms();
let (round_stream, round_sink) = network.make_round_comms(1, test_id);
let prevote = Message::Prevote(Prevote {
target_hash: "E",
target_number: 6,
});
let precommit = Message::Precommit(Precommit {
target_hash: "E",
target_number: 6,
});
let commit = (1, Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: testing::Signature(test_id.0),
id: test_id
}],
});
let (signal, exit) = ::exit_future::signal();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
last_round_state,
last_finalized,
);
::tokio::spawn(exit.clone()
.until(voter.map_err(|e| panic!("Error voting: {:?}", e))).map(|_| ()));
::tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
::tokio::spawn(exit.until(::futures::future::lazy(|| {
round_stream.into_future().map_err(|(e, _)| e)
.and_then(|(value, stream)| {
assert!(match value {
Some(SignedMessage { message: Message::Prevote(_), id: Id(5), .. }) => true,
_ => false,
});
let votes = vec![prevote, precommit].into_iter().map(Result::Ok);
round_sink.send_all(futures::stream::iter_result(votes)).map(|_| stream)
})
.and_then(|stream| {
stream.take_while(|value| match value {
SignedMessage { message: Message::Precommit(_), id: Id(5), .. } => Ok(false),
_ => Ok(true),
}).for_each(|_| Ok(()))
})
.and_then(|_| {
commits_sink.send(CommunicationOut::Commit(commit.0, commit.1))
})
.map_err(|_| ())
})).map(|_| ()));
commits_stream.into_future().map_err(|_| ())
.and_then(|(_, stream)| {
stream.take(1).for_each(|_| Ok(()))
.timeout(Duration::from_millis(500)).map_err(|_| ())
})
.then(|res| {
assert!(res.is_err());
signal.fire();
futures::future::ok::<(), ()>(())
})
})).unwrap();
}
#[test]
fn import_commit_for_any_round() {
let local_id = Id(5);
let test_id = Id(42);
let voters: VoterSet<_> = [
(local_id, 100),
(test_id, 201),
].iter().cloned().collect();
let (network, routing_task) = testing::make_network();
let (_, commits_sink) = network.make_global_comms();
let (signal, exit) = ::exit_future::signal();
let commit = (0, Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: testing::Signature(test_id.0),
id: test_id
}],
});
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
current_thread::block_on_all(::futures::future::lazy(move || {
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
1,
last_round_state,
last_finalized,
);
::tokio::spawn(exit.clone()
.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ()));
::tokio::spawn(exit.until(routing_task).map(|_| ()));
::tokio::spawn(commits_sink.send(CommunicationOut::Commit(commit.0, commit.1))
.map_err(|_| ()).map(|_| ()));
env.finalized_stream()
.take_while(|&(_, n, _)| Ok(n < 6))
.for_each(|_| Ok(()))
.map(|_| signal.fire())
})).unwrap();
}
#[test]
fn skips_to_latest_round() {
let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect();
let (network, routing_task) = testing::make_network();
let (signal, exit) = ::exit_future::signal();
current_thread::block_on_all(::futures::future::lazy(move || {
::tokio::spawn(exit.clone().until(routing_task).map(|_| ()));
let mut unsynced_voter = {
let local_id = Id(4);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
last_round_state,
last_finalized,
)
};
::tokio::spawn(::futures::future::poll_fn(move || {
if unsynced_voter.best_round.round_number() > 5 {
Ok(Async::Ready(()))
} else {
unsynced_voter.poll().map_err(|_| ())
}
}).map(|_| signal.fire()));
let synced_voters = (0..3).map(move |i| {
let local_id = Id(i);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let last_round_state = RoundState::genesis((GENESIS_HASH, 1));
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
5,
last_round_state,
last_finalized,
);
exit.clone()
.until(voter.map_err(|_| panic!("Error voting")))
.map(|_| ())
.map_err(|_| ())
});
::futures::future::join_all(synced_voters)
})).unwrap();
}
}