use std::collections::{BTreeMap, BTreeSet, HashMap};
use crate::brb_data_type::BRBDataType;
use crate::packet::{Packet, Payload};
use crate::{Error, ValidationError};
use log::info;
use brb_membership::{self, Actor, Generation, Sig, SigningActor};
use crdts::{CmRDT, Dot, VClock};
use serde::{Deserialize, Serialize};
#[derive(Debug)]
pub struct DeterministicBRB<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>> {
pub membership: brb_membership::State<A, SA, S>,
pub pending_proof: HashMap<Msg<A, BRBDT::Op>, BTreeMap<A, S>>,
pub received: VClock<A>,
pub delivered: VClock<A>,
#[allow(clippy::type_complexity)]
pub history_from_source: BTreeMap<A, Vec<(Msg<A, BRBDT::Op>, BTreeMap<A, S>)>>,
pub dt: BRBDT,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Msg<A, DataTypeOp> {
gen: Generation,
op: DataTypeOp,
dot: Dot<A>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Op<A: Ord, S, DataTypeOp> {
RequestValidation {
msg: Msg<A, DataTypeOp>,
},
SignedValidated {
msg: Msg<A, DataTypeOp>,
sig: S,
},
ProofOfAgreement {
msg: Msg<A, DataTypeOp>,
proof: BTreeMap<A, S>,
},
}
impl<A: Actor<S>, S: Sig, DataTypeOp> Payload<A, S, DataTypeOp> {
pub fn is_proof_of_agreement(&self) -> bool {
matches!(self, Payload::BRB(Op::ProofOfAgreement { .. }))
}
}
impl<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>> Default
for DeterministicBRB<A, SA, S, BRBDT>
{
fn default() -> Self {
Self::new()
}
}
impl<A: Actor<S>, SA: SigningActor<A, S>, S: Sig, BRBDT: BRBDataType<A>>
DeterministicBRB<A, SA, S, BRBDT>
{
pub fn new() -> Self {
let membership: brb_membership::State<A, SA, S> = Default::default();
let dt = BRBDT::new(membership.id.actor());
Self {
membership,
dt,
pending_proof: Default::default(),
delivered: Default::default(),
received: Default::default(),
history_from_source: Default::default(),
}
}
pub fn actor(&self) -> A {
self.membership.id.actor()
}
pub fn peers(&self) -> Result<BTreeSet<A>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.members(self.membership.gen)
.map_err(Error::Membership)
}
pub fn force_join(&mut self, peer: A) {
info!("[BRB] {:?} is forcing {:?} to join", self.actor(), peer);
self.membership.force_join(peer);
}
pub fn force_leave(&mut self, peer: A) {
info!("[BRB] {:?} is forcing {:?} to leave", self.actor(), peer);
self.membership.force_leave(peer);
}
#[allow(clippy::type_complexity)]
pub fn request_membership(
&mut self,
actor: A,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.propose(brb_membership::Reconfig::Join(actor))?
.into_iter()
.map(|vote_msg| self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote))))
.collect()
}
#[allow(clippy::type_complexity)]
pub fn kill_peer(
&mut self,
actor: A,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
self.membership
.propose(brb_membership::Reconfig::Leave(actor))?
.into_iter()
.map(|vote_msg| self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote))))
.collect()
}
#[allow(clippy::type_complexity)]
pub fn anti_entropy(
&self,
peer: A,
) -> Result<Packet<A, S, BRBDT::Op>, Error<A, S, BRBDT::ValidationError>> {
let payload = Payload::AntiEntropy {
generation: self.membership.gen,
delivered: self.delivered.clone(),
};
self.send(peer, payload)
}
#[allow(clippy::type_complexity)]
pub fn exec_op(
&self,
op: BRBDT::Op,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let msg = Msg {
op,
gen: self.membership.gen,
dot: self.received.inc(self.actor()),
};
info!("[BRB] {} initiating bft for msg {:?}", self.actor(), msg);
self.broadcast(&Payload::BRB(Op::RequestValidation { msg }), self.peers()?)
}
#[allow(clippy::type_complexity)]
pub fn handle_packet(
&mut self,
packet: Packet<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
info!(
"[BRB] handling packet from {}->{}",
packet.source,
self.actor()
);
self.validate_packet(&packet)?;
self.process_packet(packet)
}
#[allow(clippy::type_complexity)]
fn process_packet(
&mut self,
packet: Packet<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
let source = packet.source;
match packet.payload {
Payload::AntiEntropy {
generation,
delivered,
} => {
let mut packets_to_send = self
.membership
.anti_entropy(generation, source)
.into_iter()
.map(|vote_msg| {
self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote)))
})
.collect::<Result<Vec<_>, _>>()?;
for (actor, msgs) in self.history_from_source.iter() {
let seen_counter = delivered.get(actor);
packets_to_send.extend(
msgs.iter()
.filter(|(msg, _proof)| msg.dot.counter > seen_counter)
.map(|(msg, proof)| {
self.send(
source,
Payload::BRB(Op::ProofOfAgreement {
msg: msg.clone(),
proof: proof.clone(),
}),
)
})
.collect::<Result<Vec<_>, _>>()?,
);
}
Ok(packets_to_send)
}
Payload::BRB(op) => self.process_brb_op(packet.source, op),
Payload::Membership(boxed_vote) => self
.membership
.handle_vote(*boxed_vote)
.map_err(Error::Membership)?
.into_iter()
.map(|vote_msg| {
self.send(vote_msg.dest, Payload::Membership(Box::new(vote_msg.vote)))
})
.collect(),
}
}
#[allow(clippy::type_complexity)]
fn process_brb_op(
&mut self,
source: A,
op: Op<A, S, BRBDT::Op>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
match op {
Op::RequestValidation { msg } => {
info!("[BRB] request for validation");
self.received.apply(msg.dot);
let sig = self.sign(&msg)?;
let validation = Op::SignedValidated { msg, sig };
Ok(vec![self.send(source, Payload::BRB(validation))?])
}
Op::SignedValidated { msg, sig } => {
info!("[BRB] signed validated");
self.pending_proof
.entry(msg.clone())
.or_default()
.insert(source, sig);
let num_signatures = self.pending_proof[&msg].len();
if self.supermajority(num_signatures, msg.gen)?
&& !self.supermajority(num_signatures - 1, msg.gen)?
{
info!("[BRB] we have supermajority over msg, sending proof to network");
let proof = self.pending_proof[&msg].clone();
let recipients = &self.membership.members(msg.gen).unwrap()
| &vec![self.actor()].into_iter().collect();
self.broadcast(
&Payload::BRB(Op::ProofOfAgreement { msg, proof }),
recipients,
)
} else {
Ok(vec![])
}
}
Op::ProofOfAgreement { msg, proof } => {
info!("[BRB] proof of agreement: {:?}", msg);
self.received.apply(msg.dot);
self.delivered.apply(msg.dot);
self.history_from_source
.entry(msg.dot.actor)
.or_default()
.push((msg.clone(), proof));
self.pending_proof.remove(&msg);
self.dt.apply(msg.op);
Ok(vec![])
}
}
}
fn validate_packet(
&self,
packet: &Packet<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
self.verify(&packet.payload, &packet.source, &packet.sig)?;
self.validate_payload(packet.source, &packet.payload)
}
fn validate_payload(
&self,
from: A,
payload: &Payload<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
match payload {
Payload::AntiEntropy { .. } => Ok(()),
Payload::BRB(op) => self.validate_brb_op(from, op),
Payload::Membership(_) => Ok(()),
}
}
fn validate_brb_op(
&self,
from: A,
op: &Op<A, S, BRBDT::Op>,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
match op {
Op::RequestValidation { msg } => {
if from != msg.dot.actor {
Err(ValidationError::PacketSourceIsNotDot { from, dot: msg.dot })
} else if msg.dot != self.received.inc(from) {
Err(ValidationError::MsgDotNotTheNextDot {
msg_dot: msg.dot,
expected_dot: self.received.inc(from),
})
} else if msg.dot != self.delivered.inc(from) {
Err(ValidationError::SourceAlreadyHasPendingMsg {
msg_dot: msg.dot,
next_deliver_dot: self.delivered.inc(from),
})
} else if msg.gen != self.membership.gen {
Err(ValidationError::MessageFromDifferentGeneration {
msg_gen: msg.gen,
gen: self.membership.gen,
})
} else if !self
.membership
.members(self.membership.gen)?
.contains(&from)
{
Err(ValidationError::SourceIsNotVotingMember {
from,
members: self.membership.members(self.membership.gen)?,
})
} else {
self.dt
.validate(&from, &msg.op)
.map_err(ValidationError::DataTypeFailedValidation)
}
}
Op::SignedValidated { msg, sig } => {
self.verify(&msg, &from, sig)?;
if self.actor() != msg.dot.actor {
Err(ValidationError::SignedValidatedForPacketWeDidNotRequest)
} else {
Ok(())
}
}
Op::ProofOfAgreement { msg, proof } => {
let msg_members = self.membership.members(msg.gen)?;
if self.delivered.inc(msg.dot.actor) != msg.dot {
Err(ValidationError::MsgDotNotNextDotToBeDelivered {
msg_dot: msg.dot,
expected_dot: self.delivered.inc(msg.dot.actor),
})
} else if !self.supermajority(proof.len(), msg.gen)? {
Err(ValidationError::NotEnoughSignaturesToFormQuorum)
} else if !proof
.iter()
.all(|(signer, _)| msg_members.contains(&signer))
{
Err(ValidationError::ProofContainsSignaturesFromNonMembers)
} else if proof
.iter()
.map(|(signer, sig)| self.verify(&msg, &signer, &sig))
.collect::<Result<Vec<()>, _>>()
.is_err()
{
Err(ValidationError::ProofContainsInvalidSignatures)
} else {
Ok(())
}
}
}
.map_err(Error::Validation)
}
fn supermajority(
&self,
n: usize,
gen: Generation,
) -> Result<bool, Error<A, S, BRBDT::ValidationError>> {
Ok(n * 3 > self.membership.members(gen)?.len() * 2)
}
#[allow(clippy::type_complexity)]
fn broadcast(
&self,
payload: &Payload<A, S, BRBDT::Op>,
targets: BTreeSet<A>,
) -> Result<Vec<Packet<A, S, BRBDT::Op>>, Error<A, S, BRBDT::ValidationError>> {
info!("[BRB] broadcasting {}->{:?}", self.actor(), targets);
targets
.into_iter()
.map(|dest_p| self.send(dest_p, payload.clone()))
.collect()
}
#[allow(clippy::type_complexity)]
fn send(
&self,
dest: A,
payload: Payload<A, S, BRBDT::Op>,
) -> Result<Packet<A, S, BRBDT::Op>, Error<A, S, BRBDT::ValidationError>> {
let sig = self.sign(&payload)?;
Ok(Packet {
source: self.actor(),
dest,
payload,
sig,
})
}
fn sign(&self, data: impl Serialize) -> Result<S, Error<A, S, BRBDT::ValidationError>> {
let bytes = bincode::serialize(&data)?;
Ok(self.membership.id.sign(&bytes))
}
fn verify(
&self,
data: impl Serialize,
signer: &A,
sig: &S,
) -> Result<(), Error<A, S, BRBDT::ValidationError>> {
let bytes = bincode::serialize(&data)?;
signer.verify(&bytes, &sig)?;
Ok(())
}
}