mod dynamic_honey_badger;
mod error;
mod honey_badger;
mod message;
mod queueing_honey_badger;
use rand::Rng;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use log::debug;
use crate::traits::EpochT;
use crate::{ConsensusProtocol, CpStep, Epoched, NodeIdT, Target};
pub use self::error::Error;
pub use self::message::Message;
pub trait SenderQueueableMessage {
type Epoch: EpochT;
fn is_premature(&self, them: Self::Epoch, max_future_epochs: u64) -> bool;
fn is_obsolete(&self, them: Self::Epoch) -> bool;
fn is_accepted(&self, them: Self::Epoch, max_future_epochs: u64) -> bool {
!self.is_premature(them, max_future_epochs) && !self.is_obsolete(them)
}
fn first_epoch(&self) -> Self::Epoch;
}
pub trait SenderQueueableOutput<N, E>
where
N: NodeIdT,
{
fn participant_change(&self) -> Option<BTreeSet<N>>;
fn output_epoch(&self) -> E;
}
pub trait SenderQueueableConsensusProtocol: Epoched + ConsensusProtocol {
fn max_future_epochs(&self) -> u64;
}
pub type OutgoingQueue<D> = BTreeMap<
<D as ConsensusProtocol>::NodeId,
BTreeMap<<D as Epoched>::Epoch, Vec<<D as ConsensusProtocol>::Message>>,
>;
#[derive(Debug)]
pub struct SenderQueue<D>
where
D: SenderQueueableConsensusProtocol,
{
algo: D,
our_id: D::NodeId,
outgoing_queue: OutgoingQueue<D>,
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
last_epochs: BTreeMap<D::NodeId, D::Epoch>,
participants_after_change: BTreeSet<D::NodeId>,
is_removed: bool,
}
pub type Step<D> = crate::CpStep<SenderQueue<D>>;
impl<D> ConsensusProtocol for SenderQueue<D>
where
D: SenderQueueableConsensusProtocol + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
type NodeId = D::NodeId;
type Input = D::Input;
type Output = D::Output;
type Message = Message<D::Message>;
type Error = Error<D::Error>;
type FaultKind = D::FaultKind;
fn handle_input<R: Rng>(
&mut self,
input: Self::Input,
rng: &mut R,
) -> Result<CpStep<Self>, Error<D::Error>> {
self.handle_input(input, rng)
}
fn handle_message<R: Rng>(
&mut self,
sender_id: &D::NodeId,
message: Self::Message,
rng: &mut R,
) -> Result<CpStep<Self>, Error<D::Error>> {
self.handle_message(sender_id, message, rng)
}
fn terminated(&self) -> bool {
self.is_removed
}
fn our_id(&self) -> &D::NodeId {
&self.our_id
}
}
impl<D> SenderQueue<D>
where
D: SenderQueueableConsensusProtocol + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
pub fn builder<I>(algo: D, peer_ids: I) -> SenderQueueBuilder<D>
where
I: Iterator<Item = D::NodeId>,
{
SenderQueueBuilder::new(algo, peer_ids)
}
pub fn handle_input<R: Rng>(
&mut self,
input: D::Input,
rng: &mut R,
) -> Result<CpStep<Self>, Error<D::Error>> {
if self.is_removed {
return Ok(Step::<D>::default());
}
self.apply(|algo| algo.handle_input(input, rng))
}
pub fn handle_message<R: Rng>(
&mut self,
sender_id: &D::NodeId,
message: Message<D::Message>,
rng: &mut R,
) -> Result<CpStep<Self>, Error<D::Error>> {
if self.is_removed {
return Ok(Step::<D>::default());
}
match message {
Message::EpochStarted(epoch) => Ok(self.handle_epoch_started(sender_id, epoch)),
Message::Algo(msg) => self.handle_message_content(sender_id, msg, rng),
}
}
pub fn inner(&self) -> &D {
&self.algo
}
pub fn is_removed(&self) -> bool {
self.is_removed
}
fn apply<F>(&mut self, f: F) -> Result<CpStep<Self>, Error<D::Error>>
where
F: FnOnce(&mut D) -> Result<CpStep<D>, D::Error>,
{
let mut step = f(&mut self.algo).map_err(Error::Apply)?;
let mut sender_queue_step = self.update_epoch(&step);
self.defer_messages(&mut step);
sender_queue_step.extend(step.map(|output| output, |fault| fault, Message::from));
Ok(sender_queue_step)
}
fn handle_epoch_started(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
self.peer_epochs
.entry(sender_id.clone())
.and_modify(|e| {
if *e < epoch {
*e = epoch;
}
})
.or_insert(epoch);
if !self.remove_participant_if_old(sender_id) {
self.process_new_epoch(sender_id, epoch)
} else {
Step::<D>::default()
}
}
fn process_new_epoch(&mut self, sender_id: &D::NodeId, epoch: D::Epoch) -> CpStep<Self> {
let queue = match self.outgoing_queue.get_mut(sender_id) {
None => return CpStep::<Self>::default(),
Some(queue) => queue,
};
let earlier_keys: Vec<_> = queue
.keys()
.cloned()
.take_while(|this_epoch| *this_epoch <= epoch)
.collect();
earlier_keys
.into_iter()
.filter_map(|key| queue.remove(&key))
.flatten()
.filter(|msg| !msg.is_obsolete(epoch))
.map(|msg| Target::Node(sender_id.clone()).message(Message::Algo(msg)))
.into()
}
fn handle_message_content<R: Rng>(
&mut self,
sender_id: &D::NodeId,
content: D::Message,
rng: &mut R,
) -> Result<CpStep<Self>, Error<D::Error>> {
self.apply(|algo| algo.handle_message(sender_id, content, rng))
}
fn update_epoch(&mut self, step: &CpStep<D>) -> CpStep<Self> {
if step.output.is_empty() {
return Step::<D>::default();
}
let mut send_last_epoch_started = false;
for batch in &step.output {
if let Some(next_participants) = batch.participant_change() {
for id in &next_participants {
if id != self.our_id() {
self.peer_epochs.entry(id.clone()).or_default();
if let Some(&last) = self.last_epochs.get(id) {
if last < batch.output_epoch() {
self.last_epochs.remove(id);
}
}
}
}
debug!(
"Participants after the last change: {:?}",
self.participants_after_change
);
debug!("Next participants: {:?}", next_participants);
for id in self
.participants_after_change
.clone()
.difference(&next_participants)
{
self.remove_participant_after(&id, &batch.output_epoch());
}
if self.participants_after_change.contains(&self.our_id)
&& !next_participants.contains(&self.our_id)
{
send_last_epoch_started = true;
}
self.participants_after_change = next_participants;
}
}
if !self.is_removed || send_last_epoch_started {
Target::All
.message(Message::EpochStarted(self.algo.epoch()))
.into()
} else {
Step::<D>::default()
}
}
fn defer_messages(&mut self, step: &mut CpStep<D>) {
let max_future_epochs = self.algo.max_future_epochs();
for (id, message) in step.defer_messages(&self.peer_epochs, max_future_epochs) {
self.outgoing_queue
.entry(id)
.or_default()
.entry(message.first_epoch())
.or_default()
.push(message);
}
}
fn remove_participant_after(&mut self, id: &D::NodeId, last_epoch: &D::Epoch) -> bool {
self.last_epochs.insert(id.clone(), last_epoch.clone());
self.remove_participant_if_old(id)
}
fn remove_participant_if_old(&mut self, id: &D::NodeId) -> bool {
let last_epoch = if let Some(epoch) = self.last_epochs.get(id) {
*epoch
} else {
return false;
};
if last_epoch >= self.algo.epoch() {
return false;
}
if id == self.our_id() {
self.is_removed = true;
} else {
if let Some(peer_epoch) = self.peer_epochs.get(id) {
if last_epoch >= *peer_epoch {
return false;
}
}
self.peer_epochs.remove(&id);
self.outgoing_queue.remove(&id);
}
self.last_epochs.remove(&id);
true
}
pub fn algo(&self) -> &D {
&self.algo
}
pub fn algo_mut(&mut self) -> &mut D {
&mut self.algo
}
}
pub struct SenderQueueBuilder<D>
where
D: SenderQueueableConsensusProtocol,
{
algo: D,
peer_epochs: BTreeMap<D::NodeId, D::Epoch>,
}
impl<D> SenderQueueBuilder<D>
where
D: SenderQueueableConsensusProtocol + Debug,
D::Message: Clone + SenderQueueableMessage<Epoch = D::Epoch>,
D::NodeId: NodeIdT,
D::Output: SenderQueueableOutput<D::NodeId, D::Epoch>,
{
pub fn new<I>(algo: D, peer_ids: I) -> Self
where
I: Iterator<Item = D::NodeId>,
{
SenderQueueBuilder {
algo,
peer_epochs: peer_ids.map(|id| (id, D::Epoch::default())).collect(),
}
}
pub fn peer_epochs(mut self, peer_epochs: BTreeMap<D::NodeId, D::Epoch>) -> Self {
self.peer_epochs = peer_epochs;
self
}
pub fn build(self, our_id: D::NodeId) -> (SenderQueue<D>, CpStep<SenderQueue<D>>) {
let epoch = self.algo.epoch();
let sq = SenderQueue {
algo: self.algo,
our_id,
outgoing_queue: BTreeMap::new(),
peer_epochs: self.peer_epochs,
last_epochs: BTreeMap::new(),
participants_after_change: BTreeSet::new(),
is_removed: false,
};
let step = Target::All.message(Message::EpochStarted(epoch)).into();
(sq, step)
}
}