use crate::{
consensus::{DkgFailureProofSet, Vote},
messages::Message,
relocation::SignedRelocateDetails,
section::{EldersInfo, SectionKeyShare},
};
use bls_signature_aggregator::Proof;
use bytes::Bytes;
use hex_fmt::HexFmt;
use sn_messaging::{
node::NodeMessage, section_info::Message as SectionInfoMsg, Itinerary, MessageType,
};
use std::{
fmt::{self, Debug, Formatter},
net::SocketAddr,
slice,
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use tokio::sync::mpsc;
#[allow(clippy::large_enum_variant)]
pub(crate) enum Command {
HandleMessage {
sender: Option<SocketAddr>,
message: Message,
},
HandleSectionInfoMsg {
sender: SocketAddr,
message: SectionInfoMsg,
},
HandleTimeout(u64),
HandleConnectionLost(SocketAddr),
HandlePeerLost(SocketAddr),
HandleConsensus { vote: Vote, proof: Proof },
HandleDkgOutcome {
elders_info: EldersInfo,
outcome: SectionKeyShare,
},
HandleDkgFailure(DkgFailureProofSet),
SendMessage {
recipients: Vec<SocketAddr>,
delivery_group_size: usize,
message: MessageType,
},
SendUserMessage {
itinerary: Itinerary,
content: Bytes,
},
ScheduleTimeout { duration: Duration, token: u64 },
Relocate {
bootstrap_addrs: Vec<SocketAddr>,
details: SignedRelocateDetails,
message_rx: mpsc::Receiver<(MessageType, SocketAddr)>,
},
SetJoinsAllowed(bool),
}
impl Command {
pub fn send_message_to_node(recipient: &SocketAddr, message_bytes: Bytes) -> Self {
Self::send_message_to_nodes(slice::from_ref(recipient), 1, message_bytes)
}
pub fn send_message_to_nodes(
recipients: &[SocketAddr],
delivery_group_size: usize,
message_bytes: Bytes,
) -> Self {
let node_msg = NodeMessage::new(message_bytes);
Self::SendMessage {
recipients: recipients.to_vec(),
delivery_group_size,
message: MessageType::NodeMessage(node_msg),
}
}
}
impl Debug for Command {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Self::HandleMessage { sender, message } => f
.debug_struct("HandleMessage")
.field("sender", sender)
.field("message", message)
.finish(),
Self::HandleSectionInfoMsg { sender, message } => f
.debug_struct("HandleSectionInfoMsg")
.field("sender", sender)
.field("message", message)
.finish(),
Self::HandleTimeout(token) => f.debug_tuple("HandleTimeout").field(token).finish(),
Self::HandleConnectionLost(addr) => {
f.debug_tuple("HandleConnectionLost").field(addr).finish()
}
Self::HandlePeerLost(addr) => f.debug_tuple("HandlePeerLost").field(addr).finish(),
Self::HandleConsensus { vote, proof } => f
.debug_struct("HandleConsensus")
.field("vote", vote)
.field("proof.public_key", &proof.public_key)
.finish(),
Self::HandleDkgOutcome {
elders_info,
outcome,
} => f
.debug_struct("HandleDkgOutcome")
.field("elders_info", elders_info)
.field("outcome", &outcome.public_key_set.public_key())
.finish(),
Self::HandleDkgFailure(proofs) => {
f.debug_tuple("HandleDkgFailure").field(proofs).finish()
}
Self::SendMessage {
recipients,
delivery_group_size,
message,
} => f
.debug_struct("SendMessage")
.field("recipients", recipients)
.field("delivery_group_size", delivery_group_size)
.field("message", message)
.finish(),
Self::SendUserMessage { itinerary, content } => f
.debug_struct("SendUserMessage")
.field("itinerary", itinerary)
.field("content", &format_args!("{:10}", HexFmt(content)))
.finish(),
Self::ScheduleTimeout { duration, token } => f
.debug_struct("ScheduleTimeout")
.field("duration", duration)
.field("token", token)
.finish(),
Self::Relocate {
bootstrap_addrs,
details,
..
} => f
.debug_struct("Relocate")
.field("bootstrap_addrs", bootstrap_addrs)
.field("details", details)
.finish(),
Self::SetJoinsAllowed(joins_allowed) => f
.debug_tuple("SetJoinsAllowed")
.field(joins_allowed)
.finish(),
}
}
}
pub(crate) fn next_timer_token() -> u64 {
static NEXT: AtomicU64 = AtomicU64::new(0);
NEXT.fetch_add(1, Ordering::Relaxed)
}