use std::collections::{HashMap, BTreeMap, VecDeque};
use std::collections::hash_map;
use std::fmt::Debug;
use std::hash::Hash;
use futures::{future, Future, Stream, Sink, Poll, Async, AsyncSink};
use log::trace;
#[cfg(any(test, feature="codec"))]
use parity_scale_codec::{Encode, Decode};
use self::accumulator::State;
pub use self::accumulator::{Accumulator, Justification, PrepareJustification, UncheckedJustification, Misbehavior};
pub mod accumulator;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub enum Vote<D> {
Prepare(u32, D),
Commit(u32, D),
AdvanceRound(u32),
}
impl<D> Vote<D> {
pub fn round_number(&self) -> u32 {
match *self {
Vote::Prepare(round, _) => round,
Vote::Commit(round, _) => round,
Vote::AdvanceRound(round) => round,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub enum Message<C, D> {
Propose(u32, C),
Vote(Vote<D>),
}
impl<C, D> From<Vote<D>> for Message<C, D> {
fn from(vote: Vote<D>) -> Self {
Message::Vote(vote)
}
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub struct LocalizedProposal<C, D, V, S> {
pub round_number: u32,
pub proposal: C,
pub digest: D,
pub sender: V,
pub digest_signature: S,
pub full_signature: S,
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub struct LocalizedVote<D, V, S> {
pub vote: Vote<D>,
pub sender: V,
pub signature: S,
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub enum LocalizedMessage<C, D, V, S> {
Propose(LocalizedProposal<C, D, V, S>),
Vote(LocalizedVote<D, V, S>),
}
impl<C, D, V, S> LocalizedMessage<C, D, V, S> {
pub fn sender(&self) -> &V {
match *self {
LocalizedMessage::Propose(ref proposal) => &proposal.sender,
LocalizedMessage::Vote(ref vote) => &vote.sender,
}
}
pub fn round_number(&self) -> u32 {
match *self {
LocalizedMessage::Propose(ref proposal) => proposal.round_number,
LocalizedMessage::Vote(ref vote) => vote.vote.round_number(),
}
}
}
impl<C, D, V, S> From<LocalizedVote<D, V, S>> for LocalizedMessage<C, D, V, S> {
fn from(vote: LocalizedVote<D, V, S>) -> Self {
LocalizedMessage::Vote(vote)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub enum AdvanceRoundReason {
Timeout,
WasBehind,
}
pub trait Context {
type Error: From<InputStreamConcluded>;
type Candidate: Debug + Eq + Clone;
type Digest: Debug + Hash + Eq + Clone;
type AuthorityId: Debug + Hash + Eq + Clone;
type Signature: Debug + Eq + Clone;
type RoundTimeout: Future<Item=(), Error=Self::Error>;
type CreateProposal: Future<Item=Self::Candidate, Error=Self::Error>;
type EvaluateProposal: Future<Item=bool, Error=Self::Error>;
fn local_id(&self) -> Self::AuthorityId;
fn proposal(&self) -> Self::CreateProposal;
fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
-> LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>;
fn round_proposer(&self, round: u32) -> Self::AuthorityId;
fn proposal_valid(&self, proposal: &Self::Candidate) -> Self::EvaluateProposal;
fn begin_round_timeout(&self, round: u32) -> Self::RoundTimeout;
fn on_advance_round(
&self,
accumulator: &Accumulator<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>,
round: u32,
next_round: u32,
reason: AdvanceRoundReason,
) {
let _ = (accumulator, round, next_round, reason);
}
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub enum Communication<C, D, V, S> {
Consensus(LocalizedMessage<C, D, V, S>),
Auxiliary(PrepareJustification<D, S>),
}
pub trait TypeResolve {
type Communication;
}
impl<C: Context> TypeResolve for C {
type Communication = Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>;
}
#[derive(Debug)]
struct Sending<T> {
items: VecDeque<T>,
flushing: bool,
}
impl<T> Sending<T> {
fn with_capacity(n: usize) -> Self {
Sending {
items: VecDeque::with_capacity(n),
flushing: false,
}
}
fn push(&mut self, item: T) {
self.items.push_back(item);
}
fn process_all<S: Sink<SinkItem=T>>(&mut self, sink: &mut S) -> Poll<(), S::SinkError> {
loop {
while let Some(item) = self.items.pop_front() {
match sink.start_send(item) {
Err(e) => return Err(e),
Ok(AsyncSink::NotReady(item)) => {
self.items.push_front(item);
break;
}
Ok(AsyncSink::Ready) => {
self.flushing = true;
}
}
}
if self.flushing {
if let Async::Ready(()) = sink.poll_complete()? {
self.flushing = false;
}
}
match (self.flushing, self.items.len()) {
(true, _) => return Ok(Async::NotReady),
(false, pending) if pending == 0 => return Ok(Async::Ready(())),
(false, _) => continue,
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InputStreamConcluded;
impl ::std::fmt::Display for InputStreamConcluded {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "{}", ::std::error::Error::description(self))
}
}
impl ::std::error::Error for InputStreamConcluded {
fn description(&self) -> &str {
"input stream of messages concluded prematurely"
}
}
fn bft_threshold(nodes: usize, max_faulty: usize) -> usize {
nodes - max_faulty
}
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature="codec"), derive(Encode, Decode))]
pub struct Committed<C, D, S> {
pub candidate: Option<C>,
pub round_number: u32,
pub justification: Justification<D, S>,
}
struct Locked<D, S> {
justification: PrepareJustification<D, S>,
}
impl<D, S> Locked<D, S> {
fn digest(&self) -> &D {
&self.justification.digest
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LocalState {
Start,
Proposed,
Prepared(bool),
Committed,
VoteAdvance,
}
struct Strategy<C: Context> {
nodes: usize,
max_faulty: usize,
fetching_proposal: Option<C::CreateProposal>,
evaluating_proposal: Option<C::EvaluateProposal>,
round_timeout: Option<future::Fuse<C::RoundTimeout>>,
local_state: LocalState,
locked: Option<Locked<C::Digest, C::Signature>>,
notable_candidates: HashMap<C::Digest, C::Candidate>,
current_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
future_accumulators: BTreeMap<u32, Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>>,
local_id: C::AuthorityId,
misbehavior: HashMap<C::AuthorityId, Misbehavior<C::Digest, C::Signature>>,
earliest_lock_round: u32,
}
impl<C: Context> Strategy<C> {
fn create(context: &C, nodes: usize, max_faulty: usize) -> Self {
let threshold = bft_threshold(nodes, max_faulty);
let current_accumulator = Accumulator::new(
0,
threshold,
context.round_proposer(0),
);
Strategy {
nodes,
max_faulty,
current_accumulator,
future_accumulators: BTreeMap::new(),
fetching_proposal: None,
evaluating_proposal: None,
local_state: LocalState::Start,
locked: None,
notable_candidates: HashMap::new(),
round_timeout: None,
local_id: context.local_id(),
misbehavior: HashMap::new(),
earliest_lock_round: 0,
}
}
fn current_round(&self) -> u32 {
self.current_accumulator.round_number()
}
fn import_message(
&mut self,
context: &C,
msg: LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>
) {
let round_number = msg.round_number();
let sender = msg.sender().clone();
let current_round = self.current_round();
let misbehavior = if round_number == current_round {
self.current_accumulator.import_message(msg)
} else if round_number > current_round {
let threshold = bft_threshold(self.nodes, self.max_faulty);
let future_acc = self.future_accumulators.entry(round_number).or_insert_with(|| {
Accumulator::new(
round_number,
threshold,
context.round_proposer(round_number),
)
});
future_acc.import_message(msg)
} else {
Ok(())
};
if let Err(misbehavior) = misbehavior {
self.misbehavior.insert(sender, misbehavior);
}
}
fn import_lock_proof(
&mut self,
context: &C,
justification: PrepareJustification<C::Digest, C::Signature>,
) {
if justification.round_number > self.current_round() {
self.advance_to_round(context, justification.round_number, AdvanceRoundReason::WasBehind);
}
let lock_to_new = justification.round_number >= self.earliest_lock_round;
if lock_to_new {
self.earliest_lock_round = justification.round_number;
self.locked = Some(Locked { justification })
}
}
fn poll(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, C::Error>
{
let mut last_watermark = (self.current_round(), self.local_state);
loop {
trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark);
match self.poll_once(context, sending)? {
Async::Ready(x) => return Ok(Async::Ready(x)),
Async::NotReady => {
let new_watermark = (self.current_round(), self.local_state);
if new_watermark == last_watermark {
return Ok(Async::NotReady)
} else {
last_watermark = new_watermark;
}
}
}
}
}
fn poll_once(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, C::Error>
{
self.propose(context, sending)?;
self.prepare(context, sending)?;
self.commit(context, sending);
self.vote_advance(context, sending)?;
let advance = match self.current_accumulator.state() {
&State::Advanced(ref p_just) => {
if let Some(p_just) = p_just.as_ref() {
self.locked = Some(Locked { justification: p_just.clone() });
}
let round_number = self.current_round();
Some(round_number + 1)
}
&State::Committed(ref just) => {
let candidate = self.current_accumulator
.proposal()
.and_then(|c| if context.candidate_digest(c) == just.digest {
Some(c.clone())
} else {
None
})
.or_else(|| self.notable_candidates.get(&just.digest).cloned());
let committed = Committed {
candidate,
round_number: self.current_accumulator.round_number(),
justification: just.clone()
};
return Ok(Async::Ready(committed))
}
_ => None,
};
if let Some(new_round) = advance {
self.advance_to_round(context, new_round, AdvanceRoundReason::Timeout);
}
Ok(Async::NotReady)
}
fn propose(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
if let LocalState::Start = self.local_state {
let mut propose = false;
if let &State::Begin = self.current_accumulator.state() {
let round_number = self.current_round();
let primary = context.round_proposer(round_number);
propose = self.local_id == primary;
};
if !propose { return Ok(()) }
let proposal = match self.locked {
Some(ref locked) => {
self.notable_candidates.get(locked.digest()).cloned()
}
None => {
let res = self.fetching_proposal
.get_or_insert_with(|| context.proposal())
.poll()?;
match res {
Async::Ready(p) => Some(p),
Async::NotReady => None,
}
}
};
if let Some(proposal) = proposal {
self.fetching_proposal = None;
let message = Message::Propose(
self.current_round(),
proposal
);
self.import_and_send_message(message, context, sending);
if let Some(ref locked) = self.locked {
sending.push(
Communication::Auxiliary(locked.justification.clone())
);
}
self.local_state = LocalState::Proposed;
}
}
Ok(())
}
fn prepare(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
match self.local_state {
LocalState::Start | LocalState::Proposed => {},
_ => return Ok(())
};
let mut prepare_for = None;
if let &State::Proposed(ref candidate) = self.current_accumulator.state() {
let digest = context.candidate_digest(candidate);
match &mut self.locked {
&mut Some(ref locked) if locked.digest() != &digest => {}
locked => {
let res = self.evaluating_proposal
.get_or_insert_with(|| context.proposal_valid(candidate))
.poll()?;
if let Async::Ready(valid) = res {
self.evaluating_proposal = None;
self.local_state = LocalState::Prepared(valid);
if valid {
prepare_for = Some(digest);
} else {
if locked.as_ref().map_or(false, |locked| locked.digest() == &digest) {
*locked = None;
self.earliest_lock_round = ::std::cmp::max(
self.current_accumulator.round_number(),
self.earliest_lock_round,
);
}
}
}
}
}
}
if let Some(digest) = prepare_for {
let message = Vote::Prepare(
self.current_round(),
digest
).into();
self.import_and_send_message(message, context, sending);
}
Ok(())
}
fn commit(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
match self.local_state {
LocalState::Committed | LocalState::VoteAdvance => return,
_ => {}
}
let mut commit_for = None;
let thought_good = match self.local_state {
LocalState::Prepared(good) => good,
_ => true,
};
if let &State::Prepared(ref p_just) = self.current_accumulator.state() {
self.earliest_lock_round = self.current_accumulator.round_number();
if thought_good {
let digest = p_just.digest.clone();
self.locked = Some(Locked { justification: p_just.clone() });
commit_for = Some(digest);
}
}
if let Some(digest) = commit_for {
let message = Vote::Commit(
self.current_round(),
digest
).into();
self.import_and_send_message(message, context, sending);
self.local_state = LocalState::Committed;
}
}
fn vote_advance(
&mut self,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
)
-> Result<(), C::Error>
{
if let LocalState::VoteAdvance = self.local_state { return Ok(()) }
let mut attempt_advance = self.current_accumulator.advance_votes() > self.max_faulty;
if let LocalState::Prepared(false) = self.local_state {
attempt_advance = true;
}
let round_number = self.current_accumulator.round_number();
let timer_res = self.round_timeout
.get_or_insert_with(|| context.begin_round_timeout(round_number).fuse())
.poll()?;
if let Async::Ready(_) = timer_res { attempt_advance = true }
if attempt_advance {
let message = Vote::AdvanceRound(
self.current_round(),
).into();
self.import_and_send_message(message, context, sending);
self.local_state = LocalState::VoteAdvance;
}
Ok(())
}
fn advance_to_round(&mut self, context: &C, round: u32, reason: AdvanceRoundReason) {
assert!(round > self.current_round());
trace!(target: "bft", "advancing to round {}", round);
self.fetching_proposal = None;
self.evaluating_proposal = None;
self.round_timeout = None;
self.local_state = LocalState::Start;
context.on_advance_round(
&self.current_accumulator,
self.current_round(),
round,
reason,
);
if let Some(proposal) = self.current_accumulator.proposal() {
let digest = context.candidate_digest(proposal);
self.notable_candidates.entry(digest).or_insert_with(|| proposal.clone());
}
for irrelevant in (self.current_round() + 1)..round {
self.future_accumulators.remove(&irrelevant);
}
self.current_accumulator = match self.future_accumulators.remove(&round) {
Some(x) => x,
None => Accumulator::new(
round,
bft_threshold(self.nodes, self.max_faulty),
context.round_proposer(round),
),
};
}
fn import_and_send_message(
&mut self,
message: Message<C::Candidate, C::Digest>,
context: &C,
sending: &mut Sending<<C as TypeResolve>::Communication>
) {
let signed_message = context.sign_local(message);
self.import_message(context, signed_message.clone());
sending.push(Communication::Consensus(signed_message));
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Agreement<C: Context, I, O> {
context: C,
input: I,
output: O,
concluded: Option<Committed<C::Candidate, C::Digest, C::Signature>>,
sending: Sending<<C as TypeResolve>::Communication>,
strategy: Strategy<C>,
}
impl<C, I, O> Future for Agreement<C, I, O>
where
C: Context,
I: Stream<Item=<C as TypeResolve>::Communication,Error=C::Error>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=C::Error>,
{
type Item = Committed<C::Candidate, C::Digest, C::Signature>;
type Error = C::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(just) = self.concluded.take() {
return Ok(match self.sending.process_all(&mut self.output)? {
Async::Ready(()) => Async::Ready(just),
Async::NotReady => {
self.concluded = Some(just);
Async::NotReady
}
})
}
let mut driving = true;
while driving {
driving = match self.input.poll()? {
Async::Ready(msg) => {
match msg.ok_or(InputStreamConcluded)? {
Communication::Consensus(message) => self.strategy.import_message(&self.context, message),
Communication::Auxiliary(lock_proof)
=> self.strategy.import_lock_proof(&self.context, lock_proof),
}
true
}
Async::NotReady => false,
};
if let Async::Ready(just) = self.strategy.poll(&self.context, &mut self.sending)? {
self.concluded = Some(just);
return self.poll();
}
}
let _ = self.sending.process_all(&mut self.output)?;
Ok(Async::NotReady)
}
}
impl<C: Context, I, O> Agreement<C, I, O> {
pub fn context(&self) -> &C {
&self.context
}
pub fn drain_misbehavior(&mut self) -> hash_map::Drain<C::AuthorityId, Misbehavior<C::Digest, C::Signature>> {
self.strategy.misbehavior.drain()
}
pub fn fast_forward(&mut self, round: u32) {
if round > self.strategy.current_round() {
self.strategy.advance_to_round(&self.context, round, AdvanceRoundReason::WasBehind);
self.strategy.earliest_lock_round = round;
}
}
}
pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
-> Agreement<C, I, O>
where
C: Context,
I: Stream<Item=<C as TypeResolve>::Communication,Error=C::Error>,
O: Sink<SinkItem=<C as TypeResolve>::Communication,SinkError=C::Error>,
{
let strategy = Strategy::create(&context, nodes, max_faulty);
Agreement {
context,
input,
output,
concluded: None,
sending: Sending::with_capacity(4),
strategy: strategy,
}
}