use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fmt;
use std::time::Duration;
use std::{cmp::Ord, collections::HashSet, fmt::Debug, ops::Add};
use anyhow::Result;
pub use config::BackoffMultiplier;
pub use config::Config;
use derivative::Derivative;
pub use error::Error;
pub use output::Output;
pub use peer_message::PeerMessage;
use peer_message_processor::collector::collector_checkpoints::CollectorCheckpoints;
use peer_message_processor::collector::collector_commits::CollectorCommits;
use peer_message_processor::collector::collector_req_view_changes::CollectorReqViewChanges;
use peer_message_processor::collector::collector_view_changes::CollectorViewChanges;
use request_processor::RequestProcessor;
use shared_ids::{ClientId, ReplicaId, RequestId};
use timeout::TimeoutType;
use tracing::{debug, error, error_span, info, trace, warn};
use usig::Count;
use usig::{Counter, Usig};
use serde::{Deserialize, Serialize};
use peer_message::{
usig_message::{checkpoint::CheckpointCertificate, UsigMessage},
ValidatedPeerMessage,
};
use usig_msg_order_enforcer::UsigMsgOrderEnforcer;
use crate::{
client_request::ClientRequest,
output::NotReflectedOutput,
peer_message::{
req_view_change::ReqViewChange,
usig_message::view_peer_message::prepare::{Prepare, PrepareContent},
},
};
mod config;
mod error;
mod peer_message;
mod peer_message_processor;
mod usig_msg_order_enforcer;
pub mod id;
pub mod output;
pub mod timeout;
mod client_request;
mod request_processor;
#[cfg(test)]
mod tests;
pub type MinHeap<T> = BinaryHeap<Reverse<T>>;
const BACKOFF_MULTIPLIER: u8 = 2;
pub trait RequestPayload: Clone + Serialize + for<'a> Deserialize<'a> + Debug {
fn id(&self) -> RequestId;
fn verify(&self, id: ClientId) -> Result<()>;
}
#[derive(
Serialize, Deserialize, Debug, Clone, Copy, Ord, Eq, PartialEq, PartialOrd, Default, Hash,
)]
struct View(u64);
impl Add<u64> for View {
type Output = Self;
fn add(self, rhs: u64) -> Self::Output {
Self(self.0 + rhs)
}
}
impl fmt::Display for View {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "({0})", self.0)
}
}
#[derive(Debug)]
struct InView<P, Sig> {
view: View,
has_requested_view_change: bool,
collector_commits: CollectorCommits<P, Sig>,
}
#[derive(Debug)]
struct ChangeInProgress {
prev_view: View,
next_view: View,
has_broadcast_view_change: bool,
}
#[derive(Debug)]
enum ViewState<P, Sig> {
InView(InView<P, Sig>),
ChangeInProgress(ChangeInProgress),
}
impl<P: Clone, Sig: Counter + Clone> ViewState<P, Sig> {
fn new() -> Self {
Self::InView(InView {
view: View::default(),
has_requested_view_change: false,
collector_commits: CollectorCommits::new(),
})
}
}
#[derive(Clone, Debug, Derivative)]
#[derivative(Default(bound = "Sig: Counter"))]
struct ReplicaState<P, Sig> {
usig_message_order_enforcer: UsigMsgOrderEnforcer<P, Sig>,
}
#[derive(Derivative)]
#[derivative(Debug(bound = "U: Debug, U::Signature: Debug + Clone"))]
pub struct MinBft<P: RequestPayload, U: Usig>
where
U::Attestation: Clone,
U::Signature: Clone + Serialize,
U::Signature: Debug,
{
usig: U,
config: Config,
request_processor: RequestProcessor<P, U>,
sent_usig_msgs: Vec<UsigMessage<P, U::Signature>>,
replicas_state: Vec<ReplicaState<P, U::Signature>>,
view_state: ViewState<P, U::Signature>,
recv_hellos: HashSet<ReplicaId>,
counter_last_accepted_prep: Option<Count>,
collector_rvc: CollectorReqViewChanges,
collector_vc: CollectorViewChanges<P, U::Signature>,
last_checkpoint_cert: Option<CheckpointCertificate<U::Signature>>,
collector_checkpoints: CollectorCheckpoints<U::Signature>,
current_timeout_duration: Duration,
}
impl<P: RequestPayload, U: Usig> MinBft<P, U>
where
U::Attestation: Clone,
U::Signature: Clone + Serialize,
U::Signature: Debug,
{
pub fn new(usig: U, config: Config) -> Result<(Self, Output<P, U>)> {
let _minbft_span = error_span!("minbft", id = config.id.as_u64()).entered();
config.validate();
let mut minbft = Self {
replicas_state: config
.all_replicas()
.map(|_| ReplicaState::default())
.collect(),
last_checkpoint_cert: None,
sent_usig_msgs: Vec::new(),
usig,
request_processor: RequestProcessor::new(config.batch_timeout, config.max_batch_size),
view_state: ViewState::new(),
counter_last_accepted_prep: None,
recv_hellos: HashSet::new(),
collector_rvc: CollectorReqViewChanges::new(),
collector_vc: CollectorViewChanges::new(),
collector_checkpoints: CollectorCheckpoints::new(),
current_timeout_duration: config.initial_timeout_duration,
config,
};
let output = minbft.attest()?;
let output = output.reflect(&mut minbft);
Ok((minbft, output))
}
pub fn primary(&self) -> Option<ReplicaId> {
match &self.view_state {
ViewState::InView(v) => Some(self.config.primary(v.view)),
ViewState::ChangeInProgress(_) => None,
}
}
pub fn handle_client_message(&mut self, client_id: ClientId, request: P) -> Output<P, U> {
let _minbft_span = error_span!("minbft", id = self.config.id.as_u64()).entered();
let req_id = request.id();
trace!(
"Handling client request (ID: {:?}, client ID: {:?}) ...",
req_id,
client_id
);
let mut output = NotReflectedOutput::new(&self.config, &self.recv_hellos);
trace!(
"Verifying client request (ID {:?}, client ID: {:?}) ...",
req_id,
client_id
);
if request.verify(client_id).is_err() {
error!(
"Failed to handle client request (ID: {:?}, client ID: {:?}): Verification of client request failed.",
req_id,
client_id
);
output.error(Error::Request {
receiver: self.config.id,
client_id,
});
return output.reflect(self);
}
trace!(
"Successfully verified client request (ID: {:?}, client ID: {:?}).",
req_id,
client_id
);
let client_request = ClientRequest {
client: client_id,
payload: request,
};
let (start_client_timeout, prepare_content, batch_timeout_request) =
self.request_processor.process_client_req(
client_request,
&self.view_state,
self.current_timeout_duration,
&self.config,
);
if let Some(client_timeout) = start_client_timeout {
output.timeout_request(client_timeout);
}
if let Some(prepare_content) = prepare_content {
match Prepare::sign(prepare_content, &mut self.usig) {
Ok(prepare) => {
trace!("Broadcast Prepare (view: {:?}, counter: {:?}) for client request (ID: {:?}, client ID: {:?}).", prepare.view, prepare.counter(), req_id, client_id);
output.broadcast(prepare, &mut self.sent_usig_msgs);
}
Err(usig_error) => {
error!("Failed to handle client request (ID: {:?}, client ID: {:?}): Failed to sign Prepare for client request before broadcasting it. For further information see output.", req_id, client_id);
output.process_usig_error(usig_error, self.config.me(), "Prepare");
return output.reflect(self);
}
}
}
if let Some(batch_timeout_request) = batch_timeout_request {
output.timeout_request(batch_timeout_request);
}
output.reflect(self)
}
pub fn handle_peer_message(
&mut self,
from: ReplicaId,
message: PeerMessage<U::Attestation, P, U::Signature>,
) -> Output<P, U> {
let _minbft_span = error_span!("minbft", id = self.config.id.as_u64()).entered();
let msg_type = message.msg_type();
trace!(
"Handling message (origin: {:?}, type: {:?}) ...",
from,
msg_type,
);
assert_ne!(from, self.config.me());
assert!(from.as_u64() < self.config.n.get());
let mut output = NotReflectedOutput::new(&self.config, &self.recv_hellos);
let message = match message.validate(from, &self.config, &mut self.usig) {
Ok(message) => message,
Err(output_inner_error) => {
output.error(output_inner_error.into());
return output.reflect(self);
}
};
self.process_peer_message(from, message, &mut output);
trace!(
"Successfully handled message (origin: {:?}, from {:?}).",
from,
msg_type
);
output.reflect(self)
}
pub fn handle_timeout(&mut self, timeout_type: TimeoutType) -> Output<P, U> {
let _minbft_span = error_span!("minbft", id = self.config.id.as_u64()).entered();
debug!("Handling timeout (type: {:?}) ...", timeout_type);
let mut output = NotReflectedOutput::new(&self.config, &self.recv_hellos);
match timeout_type {
TimeoutType::Batch => match &self.view_state {
ViewState::InView(in_view) => {
let (maybe_batch, stop_timeout_request) =
self.request_processor.request_batcher.timeout();
output.timeout_request(stop_timeout_request);
let origin = self.config.me();
if let Some(batch) = maybe_batch {
if self.config.me_primary(in_view.view) {
trace!("Creating Prepare for timed out batch ...");
match Prepare::sign(
PrepareContent {
view: in_view.view,
origin,
request_batch: batch,
},
&mut self.usig,
) {
Ok(prepare) => {
trace!("Successfully created Prepare for timed-out batch.");
trace!("Broadcast Prepare (view: {:?}, counter: {:?}) for timed-out batch.", prepare.view, prepare.counter());
output.broadcast(prepare, &mut self.sent_usig_msgs);
trace!(
"Successfully handled timeout (type: {:?}).",
timeout_type
);
}
Err(usig_error) => {
error!("Failed to handle timeout (type: {:?}): Failed to sign Prepare for batch before broadcasting it. For further information see output.", timeout_type);
output.process_usig_error(usig_error, origin, "Prepare");
}
};
} else {
trace!("Ignoring timed out batch as replica is no longer the primary (current View: {})", in_view.view);
}
}
}
ViewState::ChangeInProgress(in_progress) => {
warn!("Handling timeout resulted in skipping creation of Prepare for timed out batch: Replica is in progress of changing views (from: {:?}, to: {:?}).", in_progress.prev_view, in_progress.next_view);
}
},
TimeoutType::Client => match &mut self.view_state {
ViewState::InView(in_view) => {
warn!("Client request timed out.");
if !in_view.has_requested_view_change {
in_view.has_requested_view_change = true;
let msg = ReqViewChange {
prev_view: in_view.view,
next_view: in_view.view + 1,
};
info!(
"Broadcast ReqViewChange (previous view: {:?}, next view: {:?}).",
msg.prev_view, msg.next_view
);
output.broadcast(msg, &mut self.sent_usig_msgs)
} else {
trace!("Already broadcast ReqViewChange (previous view: {:?}, next view: {:?}).", in_view.view, in_view.view + 1);
}
trace!("Successfully handled timeout (type: {:?}).", timeout_type);
}
ViewState::ChangeInProgress(in_progress) => {
warn!("Handling timeout resulted in skipping creation of ReqViewChange: Replica is in progress of changing views (from: {:?}, to: {:?}).", in_progress.prev_view, in_progress.next_view);
}
},
TimeoutType::ViewChange => match &mut self.view_state {
ViewState::InView(in_view) => {
warn!("Handling timeout resulted in skipping creation of ReqViewChange: Replica is in view ({:?}).", in_view.view);
}
ViewState::ChangeInProgress(in_progress) => {
warn!("View-Change timed out.");
self.current_timeout_duration *= BACKOFF_MULTIPLIER as u32;
in_progress.has_broadcast_view_change = false;
let msg = ReqViewChange {
prev_view: in_progress.prev_view,
next_view: in_progress.next_view + 1,
};
info!(
"Broadcast ReqViewChange (previous view: {:?}, next view: {:?}).",
msg.prev_view, msg.next_view
);
output.broadcast(msg, &mut self.sent_usig_msgs);
trace!("Successfully handled timeout (type: {:?}).", timeout_type);
}
},
}
output.reflect(self)
}
fn attest(&mut self) -> Result<NotReflectedOutput<P, U>> {
let attestation = self.usig.attest()?;
let message = ValidatedPeerMessage::Hello(attestation);
let mut output = NotReflectedOutput::new(&self.config, &self.recv_hellos);
output.broadcast(message, &mut self.sent_usig_msgs);
Ok(output)
}
}