use std::collections::HashSet;
use std::time::Duration;
use shared_ids::{ClientId, ReplicaId};
use tracing::{error_span, info, trace};
use usig::{Usig, UsigError};
use crate::timeout::TimeoutAny;
use crate::{Config, Error};
use crate::{
peer_message::{usig_message::UsigMessage, ValidatedPeerMessage},
timeout::Timeout,
PeerMessage, RequestPayload,
};
pub(super) struct OutputRestricted(());
type BroadcastList<Att, P, Sig> = Box<[PeerMessage<Att, P, Sig>]>;
pub enum ViewInfo {
InView(u64),
ViewChange { from: u64, to: u64 },
}
pub struct Output<P, U: Usig> {
pub broadcasts: BroadcastList<U::Attestation, P, U::Signature>,
pub responses: Box<[(ClientId, P)]>,
pub timeout_requests: Box<[TimeoutRequest]>,
pub errors: Box<[Error]>,
pub ready_for_client_requests: bool,
pub primary: Option<ReplicaId>,
pub view_info: ViewInfo,
pub round: u64,
}
pub(super) struct NotReflectedOutput<P, U: Usig> {
broadcasts: Vec<ValidatedPeerMessage<U::Attestation, P, U::Signature>>,
responses: Vec<(ClientId, P)>,
timeout_requests: Vec<TimeoutRequest>,
errors: Vec<Error>,
ready_for_client_requests: bool,
}
pub(super) trait Reflectable<P, U: Usig> {
fn process_reflected_peer_message(
&mut self,
peer_message: ValidatedPeerMessage<U::Attestation, P, U::Signature>,
output: &mut NotReflectedOutput<P, U>,
restricted: OutputRestricted,
);
fn current_primary(&self, restricted: OutputRestricted) -> Option<ReplicaId>;
fn view_info(&self, restricted: OutputRestricted) -> ViewInfo;
fn round(&self, restricted: OutputRestricted) -> u64;
}
impl<P: RequestPayload, U: Usig> NotReflectedOutput<P, U>
where
U::Attestation: Clone,
U::Signature: Clone,
{
pub(super) fn new(config: &Config, recv_hello_from: &HashSet<ReplicaId>) -> Self {
let ready_for_client_requests = recv_hello_from.len() as u64 == config.n.get();
NotReflectedOutput {
broadcasts: Vec::new(),
responses: Vec::new(),
timeout_requests: Vec::new(),
errors: Vec::new(),
ready_for_client_requests,
}
}
pub(super) fn broadcast(
&mut self,
message: impl Into<ValidatedPeerMessage<U::Attestation, P, U::Signature>>,
message_log: &mut Vec<UsigMessage<P, U::Signature>>,
) {
let message = message.into();
if let ValidatedPeerMessage::Usig(msg) = &message {
message_log.push(msg.clone());
}
self.broadcasts.push(message);
}
pub(super) fn response(&mut self, client_id: ClientId, output: P) {
trace!(
"Output response to client request (ID: {:?}, client ID: {:?}).",
output.id(),
client_id
);
self.responses.push((client_id, output));
}
pub(super) fn timeout_request(&mut self, timeout_request: TimeoutRequest) {
match &timeout_request {
TimeoutRequest::Start(timeout) => {
trace!(
"Output request for starting timeout (type: {:?}, duration: {:?}, stop class: {:?}).",
timeout.timeout_type, timeout.duration, timeout.stop_class
);
}
TimeoutRequest::Stop(timeout) => {
trace!(
"Output request for stopping timeout (type: {:?}, duration: {:?}, stop class: {:?}).",
timeout.timeout_type, timeout.duration, timeout.stop_class
);
}
TimeoutRequest::StopAny(timeout) => {
trace!(
"Output request for stopping timeout (type: {:?}, duration: {:?} ).",
timeout.timeout_type,
timeout.duration
);
}
}
self.timeout_requests.push(timeout_request);
}
pub(super) fn process_usig_error(
&mut self,
usig_error: UsigError,
replica: ReplicaId,
msg_type: &'static str,
) {
let output_error = Error::Usig {
replica,
msg_type,
usig_error,
};
self.error(output_error);
}
pub(super) fn error(&mut self, output_error: Error) {
self.errors.push(output_error);
}
pub(super) fn ready_for_client_requests(&mut self) {
info!(
"Replica is ready for client requests as sufficient Hello messages have been received."
);
self.ready_for_client_requests = true;
}
pub(super) fn reflect<S: Reflectable<P, U>>(mut self, reflectable: &mut S) -> Output<P, U> {
let _minbft_span = error_span!("reflecting").entered();
let mut last_len = 0;
loop {
let cur_len = self.broadcasts.len();
if last_len == cur_len {
break;
}
let messages: Vec<_> = self.broadcasts.iter().skip(last_len).cloned().collect();
for message in messages {
trace!(
"Processing reflected message (type {:?}) ...",
message.msg_type()
);
reflectable.process_reflected_peer_message(
message.clone(),
&mut self,
OutputRestricted(()),
);
trace!(
"Processed reflected message (type: {:?}).",
message.msg_type()
);
}
last_len = cur_len;
}
let broadcast = self.broadcasts.into_iter().map(|m| m.into()).collect();
let primary = reflectable.current_primary(OutputRestricted(()));
let view_info = reflectable.view_info(OutputRestricted(()));
let round = reflectable.round(OutputRestricted(()));
Output {
broadcasts: broadcast,
responses: self.responses.into_boxed_slice(),
timeout_requests: self.timeout_requests.into_boxed_slice(),
errors: self.errors.into_boxed_slice(),
ready_for_client_requests: self.ready_for_client_requests,
primary,
view_info,
round,
}
}
}
#[derive(Debug, Clone)]
pub enum TimeoutRequest {
Start(Timeout),
Stop(Timeout),
StopAny(TimeoutAny),
}
impl TimeoutRequest {
pub(crate) fn new_start_batch_req(duration: Duration) -> Self {
Self::Start(Timeout::batch(duration))
}
pub(crate) fn new_start_client_req(client_id: ClientId, duration: Duration) -> Self {
Self::Start(Timeout::client(client_id, duration))
}
pub(crate) fn new_start_view_change(duration: Duration) -> Self {
Self::Start(Timeout::view_change(duration))
}
pub(crate) fn new_stop_batch_req() -> Self {
Self::Stop(Timeout::batch(Duration::from_secs(0)))
}
pub(crate) fn new_stop_client_req(client_id: ClientId) -> Self {
Self::Stop(Timeout::client(client_id, Duration::from_secs(0)))
}
pub(crate) fn new_stop_view_change() -> Self {
Self::Stop(Timeout::view_change(Duration::from_secs(0)))
}
pub(crate) fn new_stop_any_client_req() -> Self {
Self::StopAny(TimeoutAny::client(Duration::from_secs(0)))
}
}