use std::collections::BTreeMap;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter::once;
use failure::Fail;
use rand::Rng;
use serde::{de::DeserializeOwned, Serialize};
use crate::fault_log::{Fault, FaultLog};
use crate::sender_queue::SenderQueueableMessage;
use crate::{Target, TargetedMessage};
pub trait Contribution: Eq + Debug + Hash + Send + Sync {}
impl<C> Contribution for C where C: Eq + Debug + Hash + Send + Sync {}
pub trait NodeIdT: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
impl<N> NodeIdT for N where N: Eq + Ord + Clone + Debug + Hash + Send + Sync {}
pub trait FaultT: Clone + Debug + Fail + PartialEq {}
impl<N> FaultT for N where N: Clone + Debug + Fail + PartialEq {}
pub trait Message: Debug + Send + Sync {}
impl<M> Message for M where M: Debug + Send + Sync {}
pub trait SessionIdT: Display + Serialize + Send + Sync + Clone + Debug {}
impl<S> SessionIdT for S where S: Display + Serialize + Send + Sync + Clone + Debug {}
pub trait EpochT: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
impl<E> EpochT for E where E: Copy + Message + Default + Eq + Ord + Serialize + DeserializeOwned {}
#[must_use = "The algorithm step result must be used."]
#[derive(Debug)]
pub struct Step<M, O, N, F: Fail> {
pub output: Vec<O>,
pub fault_log: FaultLog<N, F>,
pub messages: Vec<TargetedMessage<M, N>>,
}
impl<M, O, N, F> Default for Step<M, O, N, F>
where
F: Fail,
{
fn default() -> Self {
Step {
output: Vec::default(),
fault_log: FaultLog::default(),
messages: Vec::default(),
}
}
}
impl<M, O, N, F> Step<M, O, N, F>
where
F: Fail,
{
pub fn new(
output: Vec<O>,
fault_log: FaultLog<N, F>,
messages: Vec<TargetedMessage<M, N>>,
) -> Self {
Step {
output,
fault_log,
messages,
}
}
pub fn with_output<T: Into<Option<O>>>(mut self, output: T) -> Self {
self.output.extend(output.into());
self
}
pub fn map<M2, O2, F2, FO, FF, FM>(
self,
f_out: FO,
f_fail: FF,
f_msg: FM,
) -> Step<M2, O2, N, F2>
where
F2: Fail,
FO: Fn(O) -> O2,
FF: Fn(F) -> F2,
FM: Fn(M) -> M2,
{
Step {
output: self.output.into_iter().map(f_out).collect(),
fault_log: self.fault_log.map(f_fail),
messages: self.messages.into_iter().map(|tm| tm.map(&f_msg)).collect(),
}
}
#[must_use]
pub fn extend_with<M2, O2, F2, FF, FM>(
&mut self,
other: Step<M2, O2, N, F2>,
f_fail: FF,
f_msg: FM,
) -> Vec<O2>
where
F2: Fail,
FF: Fn(F2) -> F,
FM: Fn(M2) -> M,
{
let fails = other.fault_log.map(f_fail);
self.fault_log.extend(fails);
let msgs = other.messages.into_iter().map(|tm| tm.map(&f_msg));
self.messages.extend(msgs);
other.output
}
pub fn extend(&mut self, other: Self) {
self.output.extend(other.output);
self.fault_log.extend(other.fault_log);
self.messages.extend(other.messages);
}
pub fn join(mut self, other: Self) -> Self {
self.extend(other);
self
}
pub fn is_empty(&self) -> bool {
self.output.is_empty() && self.fault_log.is_empty() && self.messages.is_empty()
}
}
impl<M, O, N, F> From<FaultLog<N, F>> for Step<M, O, N, F>
where
F: Fail,
{
fn from(fault_log: FaultLog<N, F>) -> Self {
Step {
fault_log,
..Step::default()
}
}
}
impl<M, O, N, F> From<Fault<N, F>> for Step<M, O, N, F>
where
F: Fail,
{
fn from(fault: Fault<N, F>) -> Self {
Step {
fault_log: fault.into(),
..Step::default()
}
}
}
impl<M, O, N, F> From<TargetedMessage<M, N>> for Step<M, O, N, F>
where
F: Fail,
{
fn from(msg: TargetedMessage<M, N>) -> Self {
Step {
messages: once(msg).collect(),
..Step::default()
}
}
}
impl<I, M, O, N, F> From<I> for Step<M, O, N, F>
where
I: IntoIterator<Item = TargetedMessage<M, N>>,
F: Fail,
{
fn from(msgs: I) -> Self {
Step {
messages: msgs.into_iter().collect(),
..Step::default()
}
}
}
pub trait Epoched {
type Epoch: EpochT;
fn epoch(&self) -> Self::Epoch;
}
pub type CpStep<D> = Step<
<D as ConsensusProtocol>::Message,
<D as ConsensusProtocol>::Output,
<D as ConsensusProtocol>::NodeId,
<D as ConsensusProtocol>::FaultKind,
>;
impl<'i, M, O, N, F> Step<M, O, N, F>
where
N: NodeIdT,
M: 'i + Clone + SenderQueueableMessage,
F: Fail,
{
pub fn defer_messages(
&mut self,
peer_epochs: &BTreeMap<N, M::Epoch>,
max_future_epochs: u64,
) -> Vec<(N, M)> {
let mut deferred_msgs: Vec<(N, M)> = Vec::new();
let mut passed_msgs: Vec<_> = Vec::new();
for msg in self.messages.drain(..) {
match msg.target.clone() {
Target::Node(id) => {
if let Some(&them) = peer_epochs.get(&id) {
if msg.message.is_premature(them, max_future_epochs) {
deferred_msgs.push((id, msg.message));
} else if !msg.message.is_obsolete(them) {
passed_msgs.push(msg);
}
}
}
Target::All => {
let is_accepted = |&them| msg.message.is_accepted(them, max_future_epochs);
let is_premature = |&them| msg.message.is_premature(them, max_future_epochs);
let is_obsolete = |&them| msg.message.is_obsolete(them);
if peer_epochs.values().all(is_accepted) {
passed_msgs.push(msg);
} else {
for (id, them) in peer_epochs {
if is_premature(them) {
deferred_msgs.push((id.clone(), msg.message.clone()));
} else if !is_obsolete(them) {
passed_msgs
.push(Target::Node(id.clone()).message(msg.message.clone()));
}
}
}
}
}
}
self.messages.extend(passed_msgs);
deferred_msgs
}
}
pub trait ConsensusProtocol: Send + Sync {
type NodeId: NodeIdT;
type Input;
type Output;
type Message: Message;
type Error: Fail;
type FaultKind: FaultT;
fn handle_input<R: Rng>(
&mut self,
input: Self::Input,
rng: &mut R,
) -> Result<CpStep<Self>, Self::Error>
where
Self: Sized;
fn handle_message<R: Rng>(
&mut self,
sender_id: &Self::NodeId,
message: Self::Message,
rng: &mut R,
) -> Result<CpStep<Self>, Self::Error>
where
Self: Sized;
fn terminated(&self) -> bool;
fn our_id(&self) -> &Self::NodeId;
}