use std::marker::PhantomData;
use std::{cmp, iter};
use derivative::Derivative;
use failure::Fail;
use rand::distributions::{Distribution, Standard};
use rand::Rng;
use serde::{de::DeserializeOwned, Serialize};
use crate::crypto::{PublicKey, SecretKey};
use crate::dynamic_honey_badger::{
self, Batch as DhbBatch, DynamicHoneyBadger, FaultKind, JoinPlan, Message, Step as DhbStep,
};
use crate::transaction_queue::TransactionQueue;
use crate::{ConsensusProtocol, Contribution, NetworkInfo, NodeIdT};
pub use crate::dynamic_honey_badger::{Change, ChangeState, Input};
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "Input error: {}", _0)]
Input(dynamic_honey_badger::Error),
#[fail(display = "Handle message error: {}", _0)]
HandleMessage(dynamic_honey_badger::Error),
#[fail(display = "Propose error: {}", _0)]
Propose(dynamic_honey_badger::Error),
#[fail(display = "New joining error: {}", _0)]
NewJoining(dynamic_honey_badger::Error),
}
pub type Result<T> = ::std::result::Result<T, Error>;
pub struct QueueingHoneyBadgerBuilder<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned,
{
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
batch_size: usize,
queue: Q,
step: Option<DhbStep<Vec<T>, N>>,
_phantom: PhantomData<T>,
}
type QueueingHoneyBadgerWithStep<T, N, Q> = (QueueingHoneyBadger<T, N, Q>, Step<T, N>);
impl<T, N, Q> QueueingHoneyBadgerBuilder<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned,
Q: TransactionQueue<T>,
Standard: Distribution<N>,
{
pub fn new(dyn_hb: DynamicHoneyBadger<Vec<T>, N>) -> Self {
QueueingHoneyBadgerBuilder {
dyn_hb,
batch_size: 100,
queue: Default::default(),
step: None,
_phantom: PhantomData,
}
}
pub fn step(mut self, step: DhbStep<Vec<T>, N>) -> Self {
self.step = Some(step);
self
}
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn queue(mut self, queue: Q) -> Self {
self.queue = queue;
self
}
pub fn build<R: Rng>(self, rng: &mut R) -> Result<QueueingHoneyBadgerWithStep<T, N, Q>> {
self.build_with_transactions(None, rng)
}
pub fn build_with_transactions<TI, R>(
mut self,
txs: TI,
rng: &mut R,
) -> Result<QueueingHoneyBadgerWithStep<T, N, Q>>
where
TI: IntoIterator<Item = T>,
R: Rng,
{
self.queue.extend(txs);
let mut qhb = QueueingHoneyBadger {
dyn_hb: self.dyn_hb,
batch_size: self.batch_size,
queue: self.queue,
};
let mut step = qhb.propose(rng)?;
if let Some(dhb_step) = self.step {
step.extend(dhb_step);
}
Ok((qhb, step))
}
}
#[derive(Derivative)]
#[derivative(Debug)]
pub struct QueueingHoneyBadger<T, N: Ord, Q> {
batch_size: usize,
dyn_hb: DynamicHoneyBadger<Vec<T>, N>,
queue: Q,
}
pub type Step<T, N> = crate::Step<Message<N>, Batch<T, N>, N, FaultKind>;
impl<T, N, Q> ConsensusProtocol for QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned,
Q: TransactionQueue<T>,
Standard: Distribution<N>,
{
type NodeId = N;
type Input = Input<T, N>;
type Output = Batch<T, N>;
type Message = Message<N>;
type Error = Error;
type FaultKind = FaultKind;
fn handle_input<R: Rng>(&mut self, input: Self::Input, rng: &mut R) -> Result<Step<T, N>> {
match input {
Input::User(tx) => self.push_transaction(tx, rng),
Input::Change(change) => self.vote_for(change, rng),
}
}
fn handle_message<R: Rng>(
&mut self,
sender_id: &N,
message: Self::Message,
rng: &mut R,
) -> Result<Step<T, N>> {
self.handle_message(sender_id, message, rng)
}
fn terminated(&self) -> bool {
false
}
fn our_id(&self) -> &N {
self.dyn_hb.our_id()
}
}
impl<T, N, Q> QueueingHoneyBadger<T, N, Q>
where
T: Contribution + Serialize + DeserializeOwned + Clone,
N: NodeIdT + Serialize + DeserializeOwned,
Q: TransactionQueue<T>,
Standard: Distribution<N>,
{
pub fn builder(dyn_hb: DynamicHoneyBadger<Vec<T>, N>) -> QueueingHoneyBadgerBuilder<T, N, Q> {
QueueingHoneyBadgerBuilder::new(dyn_hb)
}
pub fn builder_joining<R: Rng>(
our_id: N,
secret_key: SecretKey,
join_plan: JoinPlan<N>,
rng: &mut R,
) -> Result<QueueingHoneyBadgerBuilder<T, N, Q>> {
let (dhb, step) = DynamicHoneyBadger::new_joining(our_id, secret_key, join_plan, rng)
.map_err(Error::NewJoining)?;
Ok(QueueingHoneyBadgerBuilder::new(dhb).step(step))
}
pub fn push_transaction<R: Rng>(&mut self, tx: T, rng: &mut R) -> Result<Step<T, N>> {
self.queue.extend(iter::once(tx));
self.propose(rng)
}
pub fn vote_for<R: Rng>(&mut self, change: Change<N>, rng: &mut R) -> Result<Step<T, N>> {
self.apply(|dyn_hb, _| dyn_hb.vote_for(change), rng)
}
pub fn vote_to_add<R: Rng>(
&mut self,
node_id: N,
pub_key: PublicKey,
rng: &mut R,
) -> Result<Step<T, N>> {
self.apply(|dyn_hb, _| dyn_hb.vote_to_add(node_id, pub_key), rng)
}
pub fn vote_to_remove<R: Rng>(&mut self, node_id: &N, rng: &mut R) -> Result<Step<T, N>> {
self.apply(|dyn_hb, _| dyn_hb.vote_to_remove(node_id), rng)
}
pub fn handle_message<R: Rng>(
&mut self,
sender_id: &N,
message: Message<N>,
rng: &mut R,
) -> Result<Step<T, N>> {
self.apply(
|dyn_hb, rng| dyn_hb.handle_message(sender_id, message, rng),
rng,
)
}
pub fn dyn_hb(&self) -> &DynamicHoneyBadger<Vec<T>, N> {
&self.dyn_hb
}
pub fn netinfo(&self) -> &NetworkInfo<N> {
self.dyn_hb.netinfo()
}
pub fn queue(&self) -> &Q {
&self.queue
}
fn apply<R, F>(&mut self, f: F, rng: &mut R) -> Result<Step<T, N>>
where
F: FnOnce(
&mut DynamicHoneyBadger<Vec<T>, N>,
&mut R,
) -> dynamic_honey_badger::Result<Step<T, N>>,
R: Rng,
{
let step = f(&mut self.dyn_hb, rng).map_err(Error::Input)?;
self.queue
.remove_multiple(step.output.iter().flat_map(Batch::iter));
Ok(step.join(self.propose(rng)?))
}
pub fn next_epoch(&self) -> u64 {
self.dyn_hb.next_epoch()
}
fn can_propose(&self) -> bool {
if self.dyn_hb.has_input() {
return false; }
!self.queue.is_empty() || self.dyn_hb.should_propose()
}
fn propose<R: Rng>(&mut self, rng: &mut R) -> Result<Step<T, N>> {
let mut step = Step::default();
while self.can_propose() {
let amount = cmp::max(1, self.batch_size / self.dyn_hb.netinfo().num_nodes());
let proposal = self.queue.choose(rng, amount, self.batch_size);
step.extend(
self.dyn_hb
.handle_input(Input::User(proposal), rng)
.map_err(Error::Propose)?,
);
}
Ok(step)
}
}
pub type Batch<T, N> = DhbBatch<Vec<T>, N>;