// Copyright 2021 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.
//! A Deterministic Implementation of Byzantine Reliable Broadcast (BRB)
//!
//! BRB is a Byzantine Fault Tolerant (BFT) system for achieving network agreement over
//! eventually consistent data-type algorithms such as CRDTs.
//!
//! BRB ensures that we will never have a conflicting operation accepted by the network.
//!
//! BRB is similar in operation to a 2-phase-commit. It differs in that the underlying
//! algorithm decides the level of parallelism. The only constraints directly imposed by
//! BRB are that operations produced by an actor is processed in the order that operations
//! are created by the actor (source ordering) and that each operation is applied in the
//! network-agreed-upon generation in which it was created.
use std::collections::{BTreeMap, BTreeSet, HashMap};
use crate::brb_data_type::BRBDataType;
use crate::packet::{Packet, Payload};
use crate::{Error, ValidationError};
use log::info;
use brb_membership::{self, Actor, Generation, Sig, SigningActor};
use crdts::{CmRDT, Dot, VClock};
use serde::{Deserialize, Serialize};
/// DeterministicBRB -- the heart and soul of BRB.
#[derive(Debug)]
pub struct DeterministicBRB<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>> {
/// The identity of a process
pub membership: brb_membership::State<A, SA, S>,
/// Msgs this process has initiated and is waiting on BFT agreement for from the network.
pub pending_proof: HashMap<Msg<A, BRBDT::Op>, BTreeMap<A, S>>,
/// Msgs this process has sent ProofOfAgreement for but has not yet received a
/// super-majority of delivery confirmations.
#[allow(clippy::type_complexity)]
pub pending_delivery: HashMap<Msg<A, BRBDT::Op>, (BTreeMap<A, S>, BTreeSet<A>)>,
/// The clock representing the most recently received messages from each process.
/// These are messages that have been acknowledged but not yet
/// This clock must at all times be greator or equal to the `delivered` clock.
pub received: VClock<A>,
/// The clock representing the most recent msgs we've delivered to the underlying datatype `dt`.
pub delivered: VClock<A>,
/// History is maintained to onboard new members
#[allow(clippy::type_complexity)]
pub history_from_source: BTreeMap<A, Vec<(Msg<A, BRBDT::Op>, BTreeMap<A, S>)>>,
/// The state of the datatype that we are running BFT over.
/// This can be the causal bank described in AT2, or it can be a CRDT.
pub dt: BRBDT,
}
/// A BRB message consisting of an operation to be performed by the DataType we are
/// securing along with a Generation and a Dot indicating the context when it was created.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Msg<A, DataTypeOp> {
/// Generation of Msg creation
pub gen: Generation,
/// DataType operation
pub op: DataTypeOp,
/// Dot of Msg creation
pub dot: Dot<A>,
}
/// An enumeration of BRB operations
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Op<A: Ord, S, DataTypeOp> {
/// Source Actor is requesting that a peer validate and sign an operation.
RequestValidation {
/// The message to be validated
msg: Msg<A, DataTypeOp>,
},
/// Peer has validated and signed an operation, intended for return to Source Actor
SignedValidated {
/// The validated message
msg: Msg<A, DataTypeOp>,
/// Message signature
sig: S,
},
/// Source Actor is providing proof that a supermajority of members have signed and validated an op.
ProofOfAgreement {
/// the message being agreed upon
msg: Msg<A, DataTypeOp>,
/// A HashSet of message signatures, by Actor.
proof: BTreeMap<A, S>,
},
/// After a node receives ProofOfAgreement, it responds to the initiator with a Delivered packet
/// to let it know that this message was successfully delivered.
Delivered {
/// the message that was delivered
msg: Msg<A, DataTypeOp>,
},
}
impl<A: Actor<S>, S: Sig, DataTypeOp> Payload<A, S, DataTypeOp> {
/// true if this Payload represents an Op::ProofOfAgreement
pub fn is_proof_of_agreement(&self) -> bool {
matches!(self, Payload::BRB(Op::ProofOfAgreement { .. }))
}
}
impl<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>> Default
for DeterministicBRB<A, SA, S, BRBDT>
{
/// returns a default DeterministicBRB
fn default() -> Self {
Self::new()
}
}
impl<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>>
DeterministicBRB<A, SA, S, BRBDT>
{
/// returns a new DeterministicBRB
pub fn new() -> Self {
let membership: brb_membership::State<A, SA, S> = Default::default();
let dt = BRBDT::new(membership.id.actor());
Self {
membership,
dt,
pending_proof: Default::default(),
pending_delivery: Default::default(),
delivered: Default::default(),
received: Default::default(),
history_from_source: Default::default(),
}
}
/// returns the Actor
pub fn actor(&self) -> A {
self.membership.id.actor()
}
/// returns a set of known peers
pub fn peers(&self) -> Result<BTreeSet<A>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.members(self.membership.gen)
.map_err(Error::Membership)
}
/// Locally adds a peer to voting group without going through the
/// regular brb_membership join + voting process.
pub fn force_join(&mut self, peer: A) {
info!("[BRB] {:?} is forcing {:?} to join", self.actor(), peer);
self.membership.force_join(peer);
}
/// Locally removes a peer from voting group without going through the
/// regular brb_membership leave + voting process.
pub fn force_leave(&mut self, peer: A) {
info!("[BRB] {:?} is forcing {:?} to leave", self.actor(), peer);
self.membership.force_leave(peer);
}
/// Proposes membership for an Actor.
///
/// The node proposing membership must already be a voting member and
/// thus typically will be proposing to add a different non-voting actor.
///
/// In other words, a node may not directly propose to add itself, but instead
/// must have a sponsor.
#[allow(clippy::type_complexity)]
pub fn request_membership(
&mut self,
actor: A,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.propose(brb_membership::Reconfig::Join(actor))?
.into_iter()
.map(|vote_msg| self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote))))
.collect()
}
/// Proposes that a member be removed from the voting group.
///
/// The node proposing membership must already be a voting member and
/// may propose that self or another member be removed.
///
/// See https://github.com/maidsafe/brb/issues/18
#[allow(clippy::type_complexity)]
pub fn kill_peer(
&mut self,
actor: A,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.propose(brb_membership::Reconfig::Leave(actor))?
.into_iter()
.map(|vote_msg| self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote))))
.collect()
}
/// Sends an AntiEntropy packet to the given peer, indicating the last
/// generation we have seen.
///
/// The remote peer should respond with history since our last-seen
/// generation to bring our peer up-to-date.
///
/// If we have not seen any generation, then this becomes a means to
/// bootstrap our node from the "genesis" generation.
#[allow(clippy::type_complexity)]
pub fn anti_entropy(
&self,
peer: A,
) -> Result<Packet<A, S, BRBDT::Op>, Error<A, S, BRBDT::ValidationError>> {
let payload = Payload::AntiEntropy {
generation: self.membership.gen,
delivered: self.delivered.clone(),
};
self.send(peer, payload)
}
/// Resend any proof of agreements that we have not yet received delivery confirmation for.
#[allow(clippy::type_complexity)]
pub fn resend_pending_deliveries(
&self,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let mut packets = Vec::new();
for (msg, (proof, delivered)) in self.pending_delivery.iter() {
let recipients = &self.membership.members(msg.gen)? - delivered;
packets.extend(self.broadcast(
&Payload::BRB(Op::ProofOfAgreement {
msg: msg.clone(),
proof: proof.clone(),
}),
recipients,
)?)
}
Ok(packets)
}
/// Resend any RequestValidation packets that have not yet received enough signatures.
#[allow(clippy::type_complexity)]
pub fn resend_pending_validation_requests(
&self,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let mut packets = Vec::new();
for (msg, sigs) in self.pending_proof.iter() {
let recipients = &self.membership.members(msg.gen)? - &sigs.keys().cloned().collect();
packets.extend(self.broadcast(
&Payload::BRB(Op::RequestValidation { msg: msg.clone() }),
recipients,
)?)
}
Ok(packets)
}
/// Resend any messages for which we haven't received a response.
#[allow(clippy::type_complexity)]
pub fn resend_pending_msgs(
&self,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let mut packets = self.resend_pending_validation_requests()?;
packets.extend(self.resend_pending_deliveries()?);
Ok(packets)
}
/// Initiates the BRB process for an operation on the BRBDataType.
///
/// Returns BRB packets to be delivered to other network members. Packets
/// destined to ourselves will short-circuited and handled to completion before
/// this method returns.
///
/// This short-circuiting is done to remove some potential race-conditions when
/// BRB is integrated into a highly concurrent application where multiple threads
/// are concurrently fighting to initiate BRB operations.
///
/// NOTE: Network members will refuse to sign multiple operations from a
/// source concurrently. It's recommended to ensure there aren't any
/// pending deliveries before you initiate a new operation to reduce
/// the chance of this happening.
/// A naive implementation of this would be:
///
/// ```ignore
/// let mut packets_to_resend = brb.resend_pending_msgs()?;
///
/// while !packets_to_resend.is_empty() {
/// // ... re-send these packets
/// network.send_packets(packets_to_resend);
/// sleep(TIMEOUT_SECONDS);
/// packets_to_resend = brb.resend_pending_msgs()?;
/// }
///
/// brb.exec_op(op)?;
/// ```
#[allow(clippy::type_complexity)]
pub fn exec_op(
&mut self,
op: BRBDT::Op,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let msg = Msg {
op,
gen: self.membership.gen,
// We use the received clock to allow for many operations from this process
// to be pending agreement at any one point in time.
dot: self.received.inc(self.actor()),
};
info!("[BRB] {} initiating bft for msg {:?}", self.actor(), msg);
let (mut self_packets, mut others_packets): (Vec<_>, Vec<_>) = self
.broadcast(&Payload::BRB(Op::RequestValidation { msg }), self.peers()?)?
.into_iter()
.partition(|p| p.dest == self.actor());
while let Some(packet) = self_packets.pop() {
let (next_self_packets, next_others_packets): (Vec<_>, Vec<_>) = self
.handle_packet(packet)?
.into_iter()
.partition(|p| p.dest == self.actor());
self_packets.extend(next_self_packets);
others_packets.extend(next_others_packets);
}
Ok(others_packets)
}
/// handles an incoming BRB Packet.
#[allow(clippy::type_complexity)]
pub fn handle_packet(
&mut self,
packet: Packet<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
info!(
"[BRB] handling packet from {}->{}",
packet.source,
self.actor()
);
self.validate_packet(&packet)?;
self.process_packet(packet)
}
/// processes an incoming BRB Packet after it has been validated.
#[allow(clippy::type_complexity)]
fn process_packet(
&mut self,
packet: Packet<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let source = packet.source;
match packet.payload {
Payload::AntiEntropy {
generation,
delivered,
} => {
let mut packets_to_send = self
.membership
.anti_entropy(generation, source)
.into_iter()
.map(|vote_msg| {
self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote)))
})
.collect::<Result<Vec<_>, _>>()?;
for (actor, msgs) in self.history_from_source.iter() {
let seen_counter = delivered.get(actor);
packets_to_send.extend(
// TODO: This can be optimized using Vec::binary_search. This is linear in the number of messages.
msgs.iter()
.filter(|(msg, _proof)| msg.dot.counter > seen_counter)
.map(|(msg, proof)| {
self.send(
source,
Payload::BRB(Op::ProofOfAgreement {
msg: msg.clone(),
proof: proof.clone(),
}),
)
})
.collect::<Result<Vec<_>, _>>()?,
);
}
Ok(packets_to_send)
}
Payload::BRB(op) => self.process_brb_op(packet.source, op),
Payload::Membership(boxed_vote) => self
.membership
.handle_vote(*boxed_vote)
.map_err(Error::Membership)?
.into_iter()
.map(|vote_msg| {
self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote)))
})
.collect(),
}
}
/// processes an incoming BRB operation.
#[allow(clippy::type_complexity)]
fn process_brb_op(
&mut self,
source: A,
op: Op<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
match op {
Op::RequestValidation { msg } => {
info!("[BRB] request for validation");
self.received.apply(msg.dot);
// NOTE: we do not need to store this message, it will be sent back to us
// with the proof of agreement. Our signature will prevent tampering.
let sig = self.sign(&msg)?;
let validation = Op::SignedValidated { msg, sig };
Ok(vec![self.send(source, Payload::BRB(validation))?])
}
Op::SignedValidated { msg, sig } => {
info!("[BRB] signed validated");
self.pending_proof
.entry(msg.clone())
.or_default()
.insert(source, sig);
let num_signatures = self.pending_proof[&msg].len();
// we don't want to re-broadcast a proof if we've already reached supermajority
// hence we check that (num_sigs - 1) was not supermajority
if self.supermajority(num_signatures, msg.gen)?
&& !self.supermajority(num_signatures - 1, msg.gen)?
{
info!("[BRB] we have supermajority over msg, sending proof to network");
// We have supermajority, broadcast proof of agreement to network
let proof = self.pending_proof[&msg].clone();
self.pending_delivery
.insert(msg.clone(), (proof.clone(), Default::default()));
// Add ourselves to the broadcast recipients since we may have initiated this request
// while we were not yet an accepted member of the network.
// e.g. this happens if we request to join the network.
let recipients = &self.membership.members(msg.gen).unwrap()
| &vec![self.actor()].into_iter().collect();
self.broadcast(
&Payload::BRB(Op::ProofOfAgreement { msg, proof }),
recipients,
)
} else {
Ok(vec![])
}
}
Op::ProofOfAgreement { msg, proof } => {
info!("[BRB] proof of agreement: {:?}", msg);
// We may not have been in the subset of members to validate this clock
// so we may not have had the chance to increment received. We must bring
// received up to this msg's timestamp.
//
// Otherwise we won't be able to validate any future messages
// from this source.
self.received.apply(msg.dot);
self.delivered.apply(msg.dot);
// Log this op in our history with proof
self.history_from_source
.entry(msg.dot.actor)
.or_default()
.push((msg.clone(), proof));
// Remove the message from pending_proof since we have a proof of agreement
// NOTE: this is a no-op for most members, only the initiating member will have
// the message in it's pending_proof set.
self.pending_proof.remove(&msg);
// Apply the op
self.dt.apply(msg.op.clone());
Ok(vec![self.send(
msg.dot.actor,
Payload::BRB(Op::Delivered { msg }),
)?])
}
Op::Delivered { msg } => {
let number_of_confirms =
if let Some((_proof, confirms)) = self.pending_delivery.get_mut(&msg) {
confirms.insert(source);
confirms.len()
} else {
0
};
if self.supermajority(number_of_confirms, msg.gen)? {
// We've seen a super-majority of delivery confirmations so we can
// be confident this operation has been committed.
self.pending_delivery.remove(&msg);
}
Ok(vec![])
}
}
}
/// Validates an incoming BRB Packet
fn validate_packet(
&self,
packet: &Packet<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
self.verify(&packet.payload, &packet.source, &packet.sig)?;
self.validate_payload(packet.source, &packet.payload)
}
/// Validates a Payload
fn validate_payload(
&self,
from: A,
payload: &Payload<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
match payload {
Payload::AntiEntropy { .. } => Ok(()),
Payload::BRB(op) => self.validate_brb_op(from, op),
Payload::Membership(_) => Ok(()), // membership votes are validated inside membership.handle_vote(..)
}
}
/// Validates a BRB operation
fn validate_brb_op(
&self,
from: A,
op: &Op<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
match op {
Op::RequestValidation { msg } => {
if from != msg.dot.actor {
Err(ValidationError::PacketSourceIsNotDot { from, dot: msg.dot })
} else if msg.dot != self.received.inc(from) {
Err(ValidationError::MsgDotNotTheNextDot {
msg_dot: msg.dot,
expected_dot: self.received.inc(from),
})
} else if msg.dot != self.delivered.inc(from) {
Err(ValidationError::SourceAlreadyHasPendingMsg {
msg_dot: msg.dot,
next_deliver_dot: self.delivered.inc(from),
})
} else if msg.gen != self.membership.gen {
Err(ValidationError::MessageFromDifferentGeneration {
msg_gen: msg.gen,
gen: self.membership.gen,
})
} else if !self
.membership
.members(self.membership.gen)?
.contains(&from)
{
Err(ValidationError::SourceIsNotVotingMember {
from,
members: self.membership.members(self.membership.gen)?,
})
} else {
self.dt
.validate(&from, &msg.op)
.map_err(ValidationError::DataTypeFailedValidation)
}
}
Op::SignedValidated { msg, sig } => {
self.verify(&msg, &from, sig)?;
if self.actor() != msg.dot.actor {
Err(ValidationError::SignedValidatedForPacketWeDidNotRequest)
} else {
Ok(())
}
}
Op::ProofOfAgreement { msg, proof } => {
let msg_members = self.membership.members(msg.gen)?;
if self.delivered.inc(msg.dot.actor) != msg.dot {
Err(ValidationError::MsgDotNotNextDotToBeDelivered {
msg_dot: msg.dot,
expected_dot: self.delivered.inc(msg.dot.actor),
})
} else if !self.supermajority(proof.len(), msg.gen)? {
Err(ValidationError::NotEnoughSignaturesToFormQuorum)
} else if !proof
.iter()
.all(|(signer, _)| msg_members.contains(&signer))
{
Err(ValidationError::ProofContainsSignaturesFromNonMembers)
} else if proof
.iter()
.map(|(signer, sig)| self.verify(&msg, &signer, &sig))
.collect::<Result<Vec<()>, _>>()
.is_err()
{
Err(ValidationError::ProofContainsInvalidSignatures)
} else {
Ok(())
}
}
Op::Delivered { msg } => {
if msg.dot.actor != self.actor() {
Err(ValidationError::DeliveredForPacketWeDidNotInitiate)
} else if !self.pending_delivery.contains_key(msg) {
Err(ValidationError::DeliveredForPacketWeAreNotWaitingOn)
} else {
Ok(())
}
}
}
.map_err(Error::Validation)
}
/// true if n represents a supermajority of votes for a given generation.
fn supermajority(
&self,
n: usize,
gen: Generation,
) -> Result<bool, Error<A, S, BRBDT::ValidationError>> {
Ok(n * 3 > self.membership.members(gen)?.len() * 2)
}
/// Generates a packet containing payload plus our payload signature
/// for each actor in targets and returns a list of all the generated
/// packets, ready to be sent by transport layer.
#[allow(clippy::type_complexity)]
fn broadcast(
&self,
payload: &Payload<A, S, BRBDT::Op>,
targets: BTreeSet<A>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
info!("[BRB] broadcasting {}->{:?}", self.actor(), targets);
targets
.into_iter()
.map(|dest_p| self.send(dest_p, payload.clone()))
.collect()
}
/// Generates a packet from self to dest containing payload plus our payload signature.
#[allow(clippy::type_complexity)]
fn send(
&self,
dest: A,
payload: Payload<A, S, BRBDT::Op>,
) -> Result<Packet<A, S, BRBDT::Op>, Error<A, S, BRBDT::ValidationError>> {
let sig = self.sign(&payload)?;
Ok(Packet {
source: self.actor(),
dest,
payload,
sig,
})
}
/// Signs data with our key
fn sign(&self, data: impl Serialize) -> Result<S, Error<A, S, BRBDT::ValidationError>> {
let bytes = bincode::serialize(&data)?;
Ok(self.membership.id.sign(&bytes))
}
/// Verifies that signature sig for data by signer is valid.
fn verify(
&self,
data: impl Serialize,
signer: &A,
sig: &S,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
let bytes = bincode::serialize(&data)?;
signer.verify(&bytes, &sig)?;
Ok(())
}
}