#![allow(clippy::write_with_newline)]
#![allow(clippy::type_complexity)]
#![allow(clippy::new_ret_no_self)]
pub mod adversary;
pub mod err;
pub mod proptest;
pub mod util;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io::Write;
use std::{cmp, env, fmt, fs, io, ops, process, time};
use rand::{self, Rng};
use hbbft::dynamic_honey_badger::Batch;
use hbbft::sender_queue::SenderQueueableOutput;
use hbbft::{self, ConsensusProtocol, Contribution, CpStep, Fault, NetworkInfo, NodeIdT, Step};
use crate::try_some;
pub use self::adversary::Adversary;
pub use self::err::CrankError;
const DEFAULT_TIME_LIMIT: Option<time::Duration> = Some(time::Duration::from_secs(60 * 5));
macro_rules! net_trace {
($self:expr, $fmt:expr, $($arg:tt)*) => (
if let Some(ref mut dest) = $self.trace {
write!(dest, $fmt, $($arg)*).expect("could not write to test's trace")
});
}
fn open_trace() -> Result<io::BufWriter<fs::File>, io::Error> {
let mut rng = rand::thread_rng();
let exec_path = env::current_exe()?;
let name = format!(
"net-trace_{}_{}_{}.txt",
exec_path
.file_name()
.expect("could not get executable filename")
.to_string_lossy()
.into_owned(),
process::id(),
rng.gen::<u16>()
);
Ok(io::BufWriter::new(fs::File::create(name)?))
}
pub struct Node<D: ConsensusProtocol> {
algorithm: D,
is_faulty: bool,
outputs: Vec<D::Output>,
faults: Vec<Fault<D::NodeId, D::FaultKind>>,
}
impl<D> fmt::Debug for Node<D>
where
D: ConsensusProtocol,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Node")
.field("algorithm", &"yes")
.field("is_faulty", &self.is_faulty)
.field("outputs", &self.outputs.len())
.finish()
}
}
impl<D: ConsensusProtocol> Node<D> {
#[inline]
fn new(algorithm: D, is_faulty: bool) -> Self {
Node {
algorithm,
is_faulty,
outputs: Vec::new(),
faults: Vec::new(),
}
}
#[inline]
pub fn algorithm(&self) -> &D {
&self.algorithm
}
#[inline]
pub fn algorithm_mut(&mut self) -> &mut D {
&mut self.algorithm
}
#[inline]
pub fn is_faulty(&self) -> bool {
self.is_faulty
}
#[inline]
pub fn id(&self) -> &D::NodeId {
self.algorithm.our_id()
}
#[inline]
pub fn outputs(&self) -> &[D::Output] {
self.outputs.as_slice()
}
#[inline]
pub fn faults(&self) -> &[Fault<D::NodeId, D::FaultKind>] {
self.faults.as_slice()
}
fn store_step(&mut self, step: &CpStep<D>)
where
D::Output: Clone,
{
self.outputs.extend(step.output.iter().cloned());
self.faults.extend(step.fault_log.0.iter().cloned());
}
}
#[derive(Clone, Debug)]
pub struct NetworkMessage<M, N> {
from: N,
to: N,
payload: M,
}
impl<M, N> NetworkMessage<M, N> {
#[inline]
pub fn new(from: N, payload: M, to: N) -> NetworkMessage<M, N> {
NetworkMessage { from, to, payload }
}
#[inline]
pub fn from(&self) -> &N {
&self.from
}
#[inline]
pub fn to(&self) -> &N {
&self.to
}
#[inline]
pub fn payload(&self) -> &M {
&self.payload
}
}
pub type NodeMap<D> = BTreeMap<<D as ConsensusProtocol>::NodeId, Node<D>>;
pub type NetMessage<D> =
NetworkMessage<<D as ConsensusProtocol>::Message, <D as ConsensusProtocol>::NodeId>;
#[allow(clippy::needless_pass_by_value)]
fn process_step<'a, D>(
nodes: &'a mut BTreeMap<D::NodeId, Node<D>>,
stepped_id: D::NodeId,
step: &CpStep<D>,
dest: &mut VecDeque<NetMessage<D>>,
error_on_fault: bool,
) -> Result<usize, CrankError<D>>
where
D: ConsensusProtocol + 'a,
D::Message: Clone,
D::Output: Clone,
{
let faulty = nodes
.get(&stepped_id)
.expect("Trying to process a step with non-existing node ID")
.is_faulty();
let mut message_count: usize = 0;
for tmsg in &step.messages {
match &tmsg.target {
hbbft::Target::Node(to) => {
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
hbbft::Target::All => {
for to in nodes.keys().filter(|&to| to != &stepped_id) {
if !faulty {
message_count = message_count.saturating_add(1);
}
dest.push_back(NetworkMessage::new(
stepped_id.clone(),
tmsg.message.clone(),
to.clone(),
));
}
}
}
}
nodes
.get_mut(&stepped_id)
.expect("Trying to process a step with non-existing node ID")
.store_step(step);
if error_on_fault && !nodes[&stepped_id].is_faulty() {
for fault in &step.fault_log.0 {
if nodes.get(&fault.node_id).map_or(false, |n| !n.is_faulty()) {
return Err(CrankError::Fault {
reported_by: stepped_id.clone(),
faulty_id: fault.node_id.clone(),
fault_kind: fault.kind.clone(),
});
}
}
}
Ok(message_count)
}
#[derive(Debug)]
pub struct NewNodeInfo<D>
where
D: ConsensusProtocol,
{
pub id: D::NodeId,
pub netinfo: NetworkInfo<D::NodeId>,
pub faulty: bool,
}
pub struct NetBuilder<D, I, A>
where
D: ConsensusProtocol,
{
node_ids: I,
num_faulty: usize,
cons: Option<Box<dyn Fn(NewNodeInfo<D>) -> (D, CpStep<D>)>>,
adversary: Option<A>,
trace: Option<bool>,
crank_limit: Option<usize>,
message_limit: Option<usize>,
time_limit: Option<time::Duration>,
error_on_fault: bool,
}
impl<D, I, A> fmt::Debug for NetBuilder<D, I, A>
where
D: ConsensusProtocol,
A: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetBuilder")
.field("node_ids", &())
.field("num_faulty", &self.num_faulty)
.field("cons", &self.cons.is_some())
.field("adversary", &self.adversary)
.field("trace", &self.trace)
.field("crank_limit", &self.crank_limit)
.field("message_limit", &self.message_limit)
.field("time_limit", &self.time_limit)
.field("error_on_fault", &self.error_on_fault)
.finish()
}
}
impl<D, I, A> NetBuilder<D, I, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Output: Clone,
I: IntoIterator<Item = D::NodeId>,
A: Adversary<D>,
{
#[inline]
pub fn new(node_ids: I) -> Self {
NetBuilder {
node_ids,
num_faulty: 0,
cons: None,
adversary: None,
trace: None,
crank_limit: None,
message_limit: None,
time_limit: DEFAULT_TIME_LIMIT,
error_on_fault: true,
}
}
#[inline]
pub fn adversary(mut self, adversary: A) -> Self {
self.adversary = Some(adversary);
self
}
#[inline]
pub fn crank_limit(mut self, crank_limit: usize) -> Self {
self.crank_limit = Some(crank_limit);
self
}
#[inline]
pub fn message_limit(mut self, message_limit: usize) -> Self {
self.message_limit = Some(message_limit);
self
}
#[inline]
pub fn no_time_limit(mut self) -> Self {
self.time_limit = None;
self
}
#[inline]
pub fn num_faulty(mut self, num_faulty: usize) -> Self {
self.num_faulty = num_faulty;
self
}
#[inline]
pub fn time_limit(mut self, limit: time::Duration) -> Self {
self.time_limit = Some(limit);
self
}
#[inline]
pub fn trace(mut self, trace: bool) -> Self {
self.trace = Some(trace);
self
}
#[inline]
pub fn error_on_fault(mut self, error_on_fault: bool) -> Self {
self.error_on_fault = error_on_fault;
self
}
#[inline]
pub fn using_step<F>(mut self, cons: F) -> Self
where
F: Fn(NewNodeInfo<D>) -> (D, CpStep<D>) + 'static,
{
self.cons = Some(Box::new(cons));
self
}
#[inline]
pub fn using<F>(self, cons_simple: F) -> Self
where
F: Fn(NewNodeInfo<D>) -> D + 'static,
{
self.using_step(move |node| (cons_simple(node), Default::default()))
}
#[inline]
pub fn build<R: Rng>(
self,
rng: &mut R,
) -> Result<(VirtualNet<D, A>, Vec<(D::NodeId, CpStep<D>)>), CrankError<D>> {
let override_time_limit = env::var("HBBFT_NO_TIME_LIMIT")
.map(|s| s.parse().expect("could not parse `HBBFT_NO_TIME_LIMIT`"))
.unwrap_or(false);
let time_limit = if override_time_limit {
eprintln!("WARNING: The time limit for individual tests has been manually disabled through `HBBFT_NO_TIME_LIMIT`.");
None
} else {
self.time_limit
};
let cons = self
.cons
.as_ref()
.expect("cannot build network without a constructor function for the nodes");
#[allow(clippy::redundant_closure)]
let (mut net, steps) = VirtualNet::new(
self.node_ids,
self.num_faulty as usize,
rng,
move |node| cons(node),
self.error_on_fault,
)?;
if self.adversary.is_some() {
net.adversary = self.adversary;
}
let trace = self.trace.unwrap_or_else(|| {
match env::var("HBBFT_TEST_TRACE").as_ref().map(|s| s.as_str()) {
Ok("true") | Ok("1") => true,
_ => false,
}
});
if trace {
net.trace = Some(open_trace().expect("could not open trace file"));
}
net.crank_limit = self.crank_limit;
net.message_limit = self.message_limit;
net.time_limit = time_limit;
Ok((net, steps))
}
}
#[derive(Debug)]
pub struct VirtualNet<D, A>
where
D: ConsensusProtocol,
A: Adversary<D>,
D::Message: Clone,
D::Output: Clone,
{
nodes: NodeMap<D>,
messages: VecDeque<NetMessage<D>>,
adversary: Option<A>,
trace: Option<io::BufWriter<fs::File>>,
crank_count: usize,
crank_limit: Option<usize>,
message_count: usize,
message_limit: Option<usize>,
time_limit: Option<time::Duration>,
start_time: time::Instant,
error_on_fault: bool,
removed_nodes: BTreeSet<D::NodeId>,
}
impl<D, A> VirtualNet<D, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Output: Clone,
A: Adversary<D>,
{
#[inline]
pub fn nodes(&self) -> impl Iterator<Item = &Node<D>> {
self.nodes.values()
}
#[inline]
pub fn nodes_mut(&mut self) -> impl Iterator<Item = &mut Node<D>> {
self.nodes.values_mut()
}
#[inline]
pub fn faulty_nodes(&self) -> impl Iterator<Item = &Node<D>> {
self.nodes().filter(|n| n.is_faulty())
}
#[inline]
pub fn faulty_nodes_mut(&mut self) -> impl Iterator<Item = &mut Node<D>> {
self.nodes_mut().filter(|n| n.is_faulty())
}
#[inline]
pub fn correct_nodes(&self) -> impl Iterator<Item = &Node<D>> {
self.nodes().filter(|n| !n.is_faulty())
}
#[inline]
pub fn correct_nodes_mut(&mut self) -> impl Iterator<Item = &mut Node<D>> {
self.nodes_mut().filter(|n| !n.is_faulty())
}
#[inline]
pub fn insert_node(&mut self, node: Node<D>) -> Option<Node<D>> {
self.removed_nodes.remove(node.id());
self.nodes.insert(node.id().clone(), node)
}
#[inline]
pub fn remove_node(&mut self, id: &D::NodeId) -> Option<Node<D>> {
self.removed_nodes.insert(id.clone());
self.messages.retain(|msg| msg.to != *id);
self.nodes.remove(id)
}
#[inline]
pub fn remove_nodes(&mut self, ids: &BTreeSet<D::NodeId>) -> Vec<Node<D>> {
ids.iter().filter_map(|id| self.remove_node(id)).collect()
}
#[inline]
#[allow(clippy::needless_pass_by_value)]
pub fn get<'a>(&'a self, id: D::NodeId) -> Option<&'a Node<D>> {
self.nodes.get(&id)
}
#[inline]
#[allow(clippy::needless_pass_by_value)]
pub fn get_mut<'a>(&'a mut self, id: D::NodeId) -> Option<&'a mut Node<D>> {
self.nodes.get_mut(&id)
}
#[inline]
pub fn messages(&self) -> impl Iterator<Item = &NetMessage<D>> {
self.messages.iter()
}
#[inline]
pub fn messages_mut(&mut self) -> impl Iterator<Item = &mut NetMessage<D>> {
self.messages.iter_mut()
}
#[inline]
pub fn messages_len(&self) -> usize {
self.messages.len()
}
#[inline]
pub fn swap_messages(&mut self, i: usize, j: usize) {
self.messages.swap(i, j)
}
#[inline]
pub fn sort_messages_by<F>(&mut self, f: F)
where
F: FnMut(&NetMessage<D>, &NetMessage<D>) -> cmp::Ordering,
{
let l = self.messages.len();
let mut msgs: Vec<_> = self.messages.drain(0..l).collect();
msgs.sort_by(f);
self.messages.extend(msgs);
}
}
impl<D, A> VirtualNet<D, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Output: Clone,
A: Adversary<D>,
{
fn new<F, I, R>(
node_ids: I,
faulty: usize,
mut rng: R,
cons: F,
error_on_fault: bool,
) -> Result<(Self, Vec<(D::NodeId, CpStep<D>)>), CrankError<D>>
where
F: Fn(NewNodeInfo<D>) -> (D, CpStep<D>),
I: IntoIterator<Item = D::NodeId>,
R: rand::Rng,
{
let net_infos = NetworkInfo::generate_map(node_ids, &mut rng)
.map_err(CrankError::InitialKeyGeneration)?;
assert!(
faulty * 3 < net_infos.len(),
"Too many faulty nodes requested, `f` must satisfy `3f < total_nodes`."
);
let mut steps = BTreeMap::new();
let mut messages = VecDeque::new();
let mut nodes = net_infos
.into_iter()
.enumerate()
.map(|(idx, (id, netinfo))| {
let is_faulty = idx < faulty;
let (algorithm, step) = cons(NewNodeInfo {
id: id.clone(),
netinfo,
faulty: is_faulty,
});
steps.insert(id.clone(), step);
(id, Node::new(algorithm, is_faulty))
})
.collect();
let mut message_count: usize = 0;
for (stepped_id, step) in &steps {
let n = process_step(
&mut nodes,
stepped_id.clone(),
step,
&mut messages,
error_on_fault,
)?;
message_count = message_count.saturating_add(n);
}
Ok((
VirtualNet {
nodes,
messages,
adversary: None,
trace: None,
crank_count: 0,
crank_limit: None,
message_count,
message_limit: None,
time_limit: None,
start_time: time::Instant::now(),
error_on_fault: true,
removed_nodes: BTreeSet::new(),
},
steps.into_iter().collect(),
))
}
#[inline]
pub fn dispatch_message<R: Rng>(
&mut self,
msg: NetMessage<D>,
rng: &mut R,
) -> Result<CpStep<D>, CrankError<D>> {
let node = self
.nodes
.get_mut(&msg.to)
.ok_or_else(|| CrankError::NodeDisappearedInDispatch(msg.to.clone()))?;
let msg_copy = msg.clone();
let step = node
.algorithm
.handle_message(&msg.from, msg.payload, rng)
.map_err(move |err| CrankError::HandleMessage { msg: msg_copy, err })?;
Ok(step)
}
#[inline]
pub fn send_input<R: Rng>(
&mut self,
id: D::NodeId,
input: D::Input,
rng: &mut R,
) -> Result<CpStep<D>, CrankError<D>> {
let step = self
.nodes
.get_mut(&id)
.expect("cannot handle input on non-existing node")
.algorithm
.handle_input(input, rng)
.map_err(CrankError::HandleInput)?;
self.process_step(id, &step)?;
Ok(step)
}
#[must_use = "The result of processing a step must be used."]
pub fn process_step(&mut self, id: D::NodeId, step: &CpStep<D>) -> Result<(), CrankError<D>> {
self.message_count = self.message_count.saturating_add(process_step(
&mut self.nodes,
id,
step,
&mut self.messages,
self.error_on_fault,
)?);
Ok(())
}
#[inline]
pub fn crank<R: Rng>(
&mut self,
rng: &mut R,
) -> Option<Result<(D::NodeId, CpStep<D>), CrankError<D>>> {
if let Some(limit) = self.crank_limit {
if self.crank_count >= limit {
return Some(Err(CrankError::CrankLimitExceeded(limit)));
}
}
if let Some(limit) = self.message_limit {
if self.message_count >= limit {
return Some(Err(CrankError::MessageLimitExceeded(limit)));
}
}
if let Some(limit) = self.time_limit {
if time::Instant::now().duration_since(self.start_time) > limit {
return Some(Err(CrankError::TimeLimitHit(limit)));
}
}
let mut adv = self.adversary.take();
if let Some(ref mut adversary) = adv {
adversary.pre_crank(adversary::NetMutHandle::new(self), rng)
}
self.adversary = adv;
let msg = loop {
let msg = self.messages.pop_front()?;
if !self.removed_nodes.contains(&msg.to) {
break msg;
}
};
net_trace!(
self,
"[{:?}] -> [{:?}]: {:?}\n",
msg.from,
msg.to,
msg.payload
);
let stepped_id = msg.to.clone();
let is_faulty = try_some!(self
.nodes
.get(&stepped_id)
.ok_or_else(|| CrankError::NodeDisappearedInCrank(msg.to.clone())))
.is_faulty();
let step: Step<_, _, _, _> = if is_faulty {
let mut adv = self.adversary.take();
let opt_tamper_result = adv.as_mut().map(|adversary| {
adversary.tamper(adversary::NetMutHandle::new(self), msg, rng)
});
self.adversary = adv;
try_some!(
opt_tamper_result.expect("No adversary defined (expected at least NullAdversary)")
)
} else {
try_some!(self.dispatch_message(msg, rng))
};
try_some!(self.process_step(stepped_id.clone(), &step));
self.crank_count += 1;
Some(Ok((stepped_id, step)))
}
pub fn crank_expect<R: Rng>(&mut self, rng: &mut R) -> (D::NodeId, CpStep<D>) {
self.crank(rng)
.expect("crank: network queue empty")
.expect("crank: node failed to process step")
}
}
impl<D, A> VirtualNet<D, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Input: Clone,
D::Output: Clone,
A: Adversary<D>,
{
#[inline]
pub fn broadcast_input<'a, R: Rng>(
&'a mut self,
input: &'a D::Input,
rng: &mut R,
) -> Result<Vec<(D::NodeId, CpStep<D>)>, CrankError<D>> {
let steps: Vec<_> = self
.nodes
.values_mut()
.map(move |node| {
Ok((
node.id().clone(),
node.algorithm
.handle_input(input.clone(), rng)
.map_err(CrankError::HandleInputAll)?,
))
})
.collect::<Result<_, _>>()?;
for (id, step) in &steps {
self.process_step(id.clone(), step)?;
}
Ok(steps)
}
}
impl<C, D, N, A> VirtualNet<D, A>
where
D: ConsensusProtocol<NodeId = N, Output = Batch<C, N>>,
D::Message: Clone,
A: Adversary<D>,
C: Contribution + Clone,
N: NodeIdT,
{
pub fn verify_batches<E>(&self, full_node: &Node<D>)
where
Batch<C, N>: SenderQueueableOutput<N, E>,
{
let mut participants: BTreeSet<N> = self.nodes().map(Node::id).cloned().collect();
let mut expected: BTreeMap<N, Vec<_>> = BTreeMap::new();
for batch in &full_node.outputs {
for id in &participants {
expected.entry(id.clone()).or_default().push(batch);
}
if let Some(new_participants) = batch.participant_change() {
participants = new_participants;
}
}
for node in self.correct_nodes().filter(|n| n.id() != full_node.id()) {
let id = node.id();
let actual_epochs: BTreeSet<_> =
node.outputs.iter().map(|batch| batch.epoch()).collect();
let expected_epochs: BTreeSet<_> =
expected[id].iter().map(|batch| batch.epoch()).collect();
assert_eq!(
expected_epochs, actual_epochs,
"Output epochs of {:?} don't match the expectation.",
id
);
assert_eq!(
node.outputs.len(),
expected[node.id()].len(),
"The output length of node {:?} is incorrect",
node.id()
);
assert!(node
.outputs
.iter()
.zip(
expected
.get(node.id())
.expect("outputs don't match the expectation")
)
.all(|(a, b)| a.public_eq(b)));
}
}
}
impl<D, A> ops::Index<D::NodeId> for VirtualNet<D, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Output: Clone,
A: Adversary<D>,
{
type Output = Node<D>;
#[inline]
fn index(&self, index: D::NodeId) -> &Self::Output {
self.get(index).expect("indexed node not found")
}
}
impl<D, A> ops::IndexMut<D::NodeId> for VirtualNet<D, A>
where
D: ConsensusProtocol,
D::Message: Clone,
D::Output: Clone,
A: Adversary<D>,
{
#[inline]
fn index_mut(&mut self, index: D::NodeId) -> &mut Self::Output {
self.get_mut(index).expect("indexed node not found")
}
}