use std::collections::BTreeMap;
use std::fmt;
use crate::ChangeMembers;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::base::BoxOnce;
use crate::core::raft_msg::external_command::ExternalCommand;
#[cfg(feature = "runtime-stats")]
use crate::core::runtime_stats::RuntimeStats;
use crate::display_ext::DisplayBTreeMapDebugValueExt;
use crate::errors::Infallible;
use crate::errors::InitializeError;
use crate::errors::LinearizableReadError;
use crate::impls::ProgressResponder;
use crate::raft::AppendEntriesRequest;
use crate::raft::ClientWriteResult;
use crate::raft::ReadPolicy;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::linearizable_read::Linearizer;
use crate::raft::responder::core_responder::CoreResponder;
use crate::raft::stream_append::StreamAppendResult;
use crate::type_config::alias::BatchOf;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::EntryPayloadOf;
#[cfg(feature = "runtime-stats")]
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::VoteOf;
pub(crate) mod external_command;
mod raft_msg_name;
pub use raft_msg_name::ExternalCommandName;
pub use raft_msg_name::RaftMsgName;
pub(crate) type ResultSender<C, T, E = Infallible> = OneshotSenderOf<C, Result<T, E>>;
pub(crate) type VoteTx<C> = OneshotSenderOf<C, VoteResponse<C>>;
pub(crate) type AppendEntriesTx<C> = OneshotSenderOf<C, StreamAppendResult<C>>;
pub(crate) type ClientReadTx<C> = ResultSender<C, Linearizer<C>, LinearizableReadError<C>>;
pub(crate) enum RaftMsg<C>
where C: RaftTypeConfig
{
AppendEntries {
rpc: AppendEntriesRequest<C>,
tx: AppendEntriesTx<C>,
},
RequestVote {
rpc: VoteRequest<C>,
tx: VoteTx<C>,
},
InstallSnapshot {
vote: VoteOf<C>,
snapshot: SnapshotOf<C>,
tx: OneshotSenderOf<C, SnapshotResponse<C>>,
},
GetSnapshotReceiver {
tx: OneshotSenderOf<C, SnapshotDataOf<C>>,
},
ClientWrite {
payloads: BatchOf<C, EntryPayloadOf<C>>,
responders: BatchOf<C, Option<CoreResponder<C>>>,
expected_leader: Option<CommittedLeaderIdOf<C>>,
#[cfg(feature = "runtime-stats")]
proposed_at: InstantOf<C>,
},
GetLinearizer {
read_policy: ReadPolicy,
tx: ClientReadTx<C>,
},
Initialize {
members: BTreeMap<C::NodeId, C::Node>,
tx: ResultSender<C, (), InitializeError<C>>,
},
ChangeMembership {
changes: ChangeMembers<C::NodeId, C::Node>,
retain: bool,
tx: ProgressResponder<C, ClientWriteResult<C>>,
},
WithRaftState {
req: BoxOnce<'static, RaftState<C>>,
},
HandleTransferLeader {
from: VoteOf<C>,
to: C::NodeId,
},
ExternalCommand {
cmd: ExternalCommand<C>,
},
#[cfg(feature = "runtime-stats")]
GetRuntimeStats {
tx: OneshotSenderOf<C, RuntimeStats<C>>,
},
}
impl<C: RaftTypeConfig> RaftMsg<C> {
pub fn name(&self) -> RaftMsgName {
match self {
RaftMsg::AppendEntries { .. } => RaftMsgName::AppendEntries,
RaftMsg::RequestVote { .. } => RaftMsgName::RequestVote,
RaftMsg::InstallSnapshot { .. } => RaftMsgName::InstallSnapshot,
RaftMsg::GetSnapshotReceiver { .. } => RaftMsgName::GetSnapshotReceiver,
RaftMsg::ClientWrite { .. } => RaftMsgName::ClientWrite,
RaftMsg::GetLinearizer { .. } => RaftMsgName::GetLinearizer,
RaftMsg::Initialize { .. } => RaftMsgName::Initialize,
RaftMsg::ChangeMembership { .. } => RaftMsgName::ChangeMembership,
RaftMsg::HandleTransferLeader { .. } => RaftMsgName::HandleTransferLeader,
RaftMsg::WithRaftState { .. } => RaftMsgName::WithRaftState,
RaftMsg::ExternalCommand { cmd } => RaftMsgName::ExternalCommand(cmd.name()),
#[cfg(feature = "runtime-stats")]
RaftMsg::GetRuntimeStats { .. } => RaftMsgName::GetRuntimeStats,
}
}
}
impl<C> fmt::Display for RaftMsg<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RaftMsg::AppendEntries { rpc, .. } => {
write!(f, "AppendEntries: {}", rpc)
}
RaftMsg::RequestVote { rpc, .. } => {
write!(f, "RequestVote: {}", rpc)
}
RaftMsg::GetSnapshotReceiver { .. } => {
write!(f, "GetSnapshotReceiver")
}
RaftMsg::InstallSnapshot { vote, snapshot, .. } => {
write!(f, "InstallSnapshot: vote: {}, snapshot: {}", vote, snapshot)
}
RaftMsg::ClientWrite { .. } => write!(f, "ClientWrite"),
RaftMsg::GetLinearizer { read_policy, .. } => {
write!(f, "GetLinearizer: {}", read_policy)
}
RaftMsg::Initialize { members, .. } => {
write!(f, "Initialize: {}", members.display())
}
RaftMsg::ChangeMembership { changes, retain, .. } => {
write!(f, "ChangeMembership: {}, retain: {}", changes, retain)
}
RaftMsg::WithRaftState { .. } => write!(f, "WithRaftState"),
RaftMsg::HandleTransferLeader { from, to } => {
write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to)
}
RaftMsg::ExternalCommand { cmd } => {
write!(f, "ExternalCommand: {}", cmd)
}
#[cfg(feature = "runtime-stats")]
RaftMsg::GetRuntimeStats { .. } => {
write!(f, "GetRuntimeStats")
}
}
}
}