use crate::node::{core::NodeContext, messaging::Peers, SectionStateVote, XorName};
use qp2p::SendStream;
use sn_consensus::Decision;
use sn_fault_detection::IssueType;
use sn_interface::{
messaging::{
data::{ClientDataResponse, ClientMsg},
system::{NodeDataResponse, NodeMsg, SectionSig, SectionSigned},
AuthorityProof, ClientAuth, MsgId, WireMsg,
},
network_knowledge::{NodeState, SectionAuthorityProvider, SectionKeyShare, SectionsDAG},
types::{DataAddress, Peer},
};
use custom_debug::Debug;
use std::{collections::BTreeSet, fmt, time::SystemTime};
#[derive(Debug)]
pub(crate) struct CmdJob {
id: usize,
parent_id: Option<usize>,
cmd: Cmd,
created_at: SystemTime,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub(crate) enum Cmd {
TryJoinNetwork,
HandleMsg {
origin: Peer,
wire_msg: WireMsg,
send_stream: Option<SendStream>,
},
SetJoinsAllowed(bool),
SetJoinsAllowedUntilSplit(bool),
TrackNodeIssue {
name: XorName,
issue: IssueType,
},
UpdateNetworkAndHandleValidClientMsg {
proof_chain: SectionsDAG,
signed_sap: SectionSigned<SectionAuthorityProvider>,
msg_id: MsgId,
msg: ClientMsg,
origin: Peer,
send_stream: SendStream,
auth: AuthorityProof<ClientAuth>,
#[debug(skip)]
context: NodeContext,
},
HandleFailedSendToNode {
peer: Peer,
msg_id: MsgId,
},
HandleSectionDecisionAgreement {
proposal: SectionStateVote,
sig: SectionSig,
},
HandleMembershipDecision(Decision<NodeState>),
HandleNewEldersAgreement {
new_elders: SectionSigned<SectionAuthorityProvider>,
sig: SectionSig,
},
HandleNewSectionsAgreement {
sap1: SectionSigned<SectionAuthorityProvider>,
sig1: SectionSig,
sap2: SectionSigned<SectionAuthorityProvider>,
sig2: SectionSig,
},
HandleDkgOutcome {
section_auth: SectionAuthorityProvider,
outcome: SectionKeyShare,
},
EnqueueDataForReplication {
recipient: Peer,
data_batch: Vec<DataAddress>,
},
SendMsg {
msg: NodeMsg,
msg_id: MsgId,
recipients: Peers,
#[debug(skip)]
context: NodeContext,
},
SendMsgEnqueueAnyResponse {
msg: NodeMsg,
msg_id: MsgId,
recipients: BTreeSet<Peer>,
#[debug(skip)]
context: NodeContext,
},
SendNodeMsgResponse {
msg: NodeMsg,
msg_id: MsgId,
recipient: Peer,
send_stream: SendStream,
#[debug(skip)]
context: NodeContext,
},
SendClientResponse {
msg: ClientDataResponse,
correlation_id: MsgId,
send_stream: SendStream,
#[debug(skip)]
context: NodeContext,
source_client: Peer,
},
SendNodeDataResponse {
msg: NodeDataResponse,
correlation_id: MsgId,
send_stream: SendStream,
#[debug(skip)]
context: NodeContext,
requesting_peer: Peer,
},
SendMsgAwaitResponseAndRespondToClient {
msg_id: MsgId,
msg: NodeMsg,
#[debug(skip)]
context: NodeContext,
targets: BTreeSet<Peer>,
client_stream: SendStream,
source_client: Peer,
},
ProposeVoteNodesOffline(BTreeSet<XorName>),
}
impl Cmd {
pub(crate) fn send_msg(msg: NodeMsg, recipients: Peers, context: NodeContext) -> Self {
let msg_id = MsgId::new();
debug!("Sending msg {msg_id:?} to {recipients:?}: {msg:?}");
Cmd::SendMsg {
msg,
msg_id,
recipients,
context,
}
}
pub(crate) fn statemap_state(&self) -> sn_interface::statemap::State {
use sn_interface::statemap::State;
match self {
Cmd::SendMsg { .. }
| Cmd::SendMsgEnqueueAnyResponse { .. }
| Cmd::SendNodeMsgResponse { .. }
| Cmd::SendClientResponse { .. }
| Cmd::SendNodeDataResponse { .. }
| Cmd::SendMsgAwaitResponseAndRespondToClient { .. } => State::Comms,
Cmd::HandleFailedSendToNode { .. } => State::Comms,
Cmd::HandleMsg { .. } => State::HandleMsg,
Cmd::UpdateNetworkAndHandleValidClientMsg { .. } => State::ClientMsg,
Cmd::TrackNodeIssue { .. } => State::FaultDetection,
Cmd::HandleSectionDecisionAgreement { .. } => State::Agreement,
Cmd::HandleMembershipDecision(_) => State::Membership,
Cmd::ProposeVoteNodesOffline(_) => State::Membership,
Cmd::HandleNewEldersAgreement { .. } => State::Handover,
Cmd::HandleNewSectionsAgreement { .. } => State::Handover,
Cmd::HandleDkgOutcome { .. } => State::Dkg,
Cmd::EnqueueDataForReplication { .. } => State::Replication,
Cmd::SetJoinsAllowed { .. } => State::Data,
Cmd::SetJoinsAllowedUntilSplit { .. } => State::Data,
Cmd::TryJoinNetwork => State::Join,
}
}
}
impl fmt::Display for Cmd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Cmd::HandleMsg { wire_msg, .. } => {
write!(f, "HandleMsg {:?}", wire_msg.msg_id())
}
Cmd::UpdateNetworkAndHandleValidClientMsg { msg_id, msg, .. } => {
write!(f, "UpdateAndHandleValidClientMsg {msg_id:?}: {msg:?}")
}
Cmd::HandleFailedSendToNode { peer, msg_id } => {
write!(f, "HandlePeerFailedSend({:?}, {:?})", peer.name(), msg_id)
}
Cmd::HandleSectionDecisionAgreement { .. } => {
write!(f, "HandleSectionDecisionAgreement")
}
Cmd::HandleNewEldersAgreement { .. } => write!(f, "HandleNewEldersAgreement"),
Cmd::HandleNewSectionsAgreement { .. } => write!(f, "HandleNewSectionsAgreement"),
Cmd::HandleMembershipDecision(_) => write!(f, "HandleMembershipDecision"),
Cmd::HandleDkgOutcome { .. } => write!(f, "HandleDkgOutcome"),
Cmd::SendMsg { .. } => write!(f, "SendMsg"),
Cmd::SendMsgEnqueueAnyResponse { .. } => write!(f, "SendMsgEnqueueAnyResponse"),
Cmd::SendNodeMsgResponse { .. } => write!(f, "SendNodeMsgResponse"),
Cmd::SendClientResponse { .. } => write!(f, "SendClientResponse"),
Cmd::SendNodeDataResponse { .. } => write!(f, "SendNodeDataResponse"),
Cmd::SendMsgAwaitResponseAndRespondToClient { .. } => {
write!(f, "SendMsgAwaitResponseAndRespondToClient")
}
Cmd::EnqueueDataForReplication { .. } => write!(f, "EnqueueDataForReplication"),
Cmd::TrackNodeIssue { name, issue } => {
write!(f, "TrackNodeIssue {name:?}, {issue:?}")
}
Cmd::ProposeVoteNodesOffline(_) => write!(f, "ProposeOffline"),
Cmd::SetJoinsAllowed { .. } => write!(f, "SetJoinsAllowed"),
Cmd::SetJoinsAllowedUntilSplit { .. } => write!(f, "SetJoinsAllowedUntilSplit"),
Cmd::TryJoinNetwork => write!(f, "TryJoinNetwork"),
}
}
}