use std::fmt;
use std::fmt::Debug;
use display_more::DisplayOptionExt;
use display_more::DisplayResultExt;
use display_more::DisplaySliceExt;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::async_runtime::OneshotSender;
use crate::core::sm;
use crate::engine::CommandKind;
use crate::engine::CommandName;
use crate::engine::replication_progress::TargetProgress;
use crate::errors::InitializeError;
use crate::errors::InstallSnapshotError;
use crate::progress::inflight_id::InflightId;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::stream_append::StreamAppendResult;
use crate::raft_state::IOId;
use crate::raft_state::IOState;
use crate::replication::ReplicationSessionId;
use crate::replication::replicate::Replicate;
use crate::type_config::alias::BatchOf;
use crate::type_config::alias::CommittedVoteOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::VoteOf;
pub(crate) enum Command<C, SM = ()>
where C: RaftTypeConfig
{
UpdateIOProgress {
when: Option<Condition<C>>,
io_id: IOId<C>,
},
AppendEntries {
committed_vote: CommittedVoteOf<C>,
entries: BatchOf<C, C::Entry>,
},
ReplicateCommitted { committed: Option<LogIdOf<C>> },
BroadcastHeartbeat { session_id: ReplicationSessionId<C> },
SaveCommittedAndApply {
already_applied: Option<LogIdOf<C>>,
upto: LogIdOf<C>,
},
Replicate { target: C::NodeId, req: Replicate<C> },
ReplicateSnapshot {
leader_vote: CommittedVoteOf<C>,
target: C::NodeId,
inflight_id: InflightId,
},
BroadcastTransferLeader { req: TransferLeaderRequest<C> },
CloseReplicationStreams,
RebuildReplicationStreams {
leader_vote: CommittedVoteOf<C>,
targets: Vec<TargetProgress<C>>,
close_old_streams: bool,
},
SaveVote { vote: VoteOf<C> },
SendVote { vote_req: VoteRequest<C> },
PurgeLog { upto: LogIdOf<C> },
TruncateLog { after: Option<LogIdOf<C>> },
StateMachine { command: sm::Command<C, SM> },
Respond {
when: Option<Condition<C>>,
resp: Respond<C>,
},
}
impl<C, SM> Debug for Command<C, SM>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
impl<C, SM> fmt::Display for Command<C, SM>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Command::UpdateIOProgress { when, io_id } => {
write!(f, "UpdateIOProgress: when: {}, io_id: {}", when.display(), io_id)
}
Command::AppendEntries {
committed_vote: vote,
entries,
} => {
write!(
f,
"AppendEntries: vote: {}, entries: {}",
vote,
entries.as_ref().display()
)
}
Command::ReplicateCommitted { committed } => {
write!(f, "ReplicateCommitted: {}", committed.display())
}
Command::BroadcastHeartbeat { session_id } => {
write!(f, "BroadcastHeartbeat: session_id:{}", session_id)
}
Command::SaveCommittedAndApply {
already_applied: already_committed,
upto,
} => write!(f, "SaveCommittedAndApply: ({}, {}]", already_committed.display(), upto),
Command::Replicate { target, req } => {
write!(f, "Replicate: target={}, req: {}", target, req)
}
Command::ReplicateSnapshot {
leader_vote,
target,
inflight_id,
} => {
write!(
f,
"ReplicateSnapshot: leader_vote: {}, target={}, inflight_id: {}",
leader_vote, target, inflight_id
)
}
Command::BroadcastTransferLeader { req } => write!(f, "TransferLeader: {}", req),
Command::CloseReplicationStreams => write!(f, "CloseReplicationStreams"),
Command::RebuildReplicationStreams {
leader_vote,
targets,
close_old_streams,
} => {
write!(
f,
"RebuildReplicationStreams: leader_vote: {}, targets: {}; close_old: {}",
leader_vote,
targets.display_n(10),
close_old_streams
)
}
Command::SaveVote { vote } => write!(f, "SaveVote: {}", vote),
Command::SendVote { vote_req } => write!(f, "SendVote: {}", vote_req),
Command::PurgeLog { upto } => write!(f, "PurgeLog: upto: {}", upto),
Command::TruncateLog { after } => write!(f, "TruncateLog: since: {}", after.display()),
Command::StateMachine { command } => write!(f, "StateMachine: command: {}", command),
Command::Respond { when, resp } => write!(f, "Respond: when: {}, resp: {}", when.display(), resp),
}
}
}
impl<C, SM> From<sm::Command<C, SM>> for Command<C, SM>
where C: RaftTypeConfig
{
fn from(cmd: sm::Command<C, SM>) -> Self {
Self::StateMachine { command: cmd }
}
}
impl<C, SM> PartialEq for Command<C, SM>
where
C: RaftTypeConfig,
C::Entry: PartialEq,
BatchOf<C, C::Entry>: PartialEq,
{
#[rustfmt::skip]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Command::UpdateIOProgress { when, io_id }, Command::UpdateIOProgress { when: wb, io_id: ab }, ) => when == wb && io_id == ab,
(Command::AppendEntries { committed_vote: vote, entries }, Command::AppendEntries { committed_vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::BroadcastHeartbeat { session_id }, Command::BroadcastHeartbeat { session_id: sb }, ) => session_id == sb,
(Command::SaveCommittedAndApply { already_applied: already_committed, upto, }, Command::SaveCommittedAndApply { already_applied: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::BroadcastTransferLeader { req }, Command::BroadcastTransferLeader { req: b, }, ) => req == b,
(Command::RebuildReplicationStreams { leader_vote, targets, close_old_streams }, Command::RebuildReplicationStreams { leader_vote: lb, targets: b, close_old_streams: cb }, ) => leader_vote == lb && targets == b && close_old_streams == cb,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
(Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b,
(Command::PurgeLog { upto }, Command::PurgeLog { upto: b }) => upto == b,
(Command::TruncateLog { after }, Command::TruncateLog { after: b }, ) => after == b,
(Command::Respond { when, resp: send }, Command::Respond { when: b_when, resp: b }) => send == b && when == b_when,
(Command::StateMachine { command }, Command::StateMachine { command: b }) => command == b,
(Command::CloseReplicationStreams, Command::CloseReplicationStreams) => true,
(Command::ReplicateSnapshot { leader_vote, target, inflight_id }, Command::ReplicateSnapshot { leader_vote: lb, target: tb, inflight_id: ib }) => leader_vote == lb && target == tb && inflight_id == ib,
_ => false,
}
}
}
impl<C, SM> Command<C, SM>
where C: RaftTypeConfig
{
#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn name(&self) -> CommandName {
match self {
Command::UpdateIOProgress { .. } => CommandName::UpdateIOProgress,
Command::AppendEntries { .. } => CommandName::AppendEntries,
Command::ReplicateCommitted { .. } => CommandName::ReplicateCommitted,
Command::BroadcastHeartbeat { .. } => CommandName::BroadcastHeartbeat,
Command::SaveCommittedAndApply { .. } => CommandName::SaveCommittedAndApply,
Command::Replicate { .. } => CommandName::Replicate,
Command::ReplicateSnapshot { .. } => CommandName::ReplicateSnapshot,
Command::BroadcastTransferLeader { .. } => CommandName::BroadcastTransferLeader,
Command::CloseReplicationStreams => CommandName::CloseReplicationStreams,
Command::RebuildReplicationStreams { .. } => CommandName::RebuildReplicationStreams,
Command::SaveVote { .. } => CommandName::SaveVote,
Command::SendVote { .. } => CommandName::SendVote,
Command::PurgeLog { .. } => CommandName::PurgeLog,
Command::TruncateLog { .. } => CommandName::TruncateLog,
Command::StateMachine { command } => CommandName::StateMachine(command.name()),
Command::Respond { .. } => CommandName::Respond,
}
}
#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn kind(&self) -> CommandKind {
match self {
Command::CloseReplicationStreams => CommandKind::Main,
Command::RebuildReplicationStreams { .. } => CommandKind::Main,
Command::Respond { .. } => CommandKind::Respond,
Command::UpdateIOProgress { .. } => CommandKind::Log,
Command::AppendEntries { .. } => CommandKind::Log,
Command::SaveVote { .. } => CommandKind::Log,
Command::TruncateLog { .. } => CommandKind::Log,
Command::PurgeLog { .. } => CommandKind::Log,
Command::ReplicateCommitted { .. } => CommandKind::Network,
Command::BroadcastHeartbeat { .. } => CommandKind::Network,
Command::Replicate { .. } => CommandKind::Network,
Command::ReplicateSnapshot { .. } => CommandKind::Network,
Command::BroadcastTransferLeader { .. } => CommandKind::Network,
Command::SendVote { .. } => CommandKind::Network,
Command::SaveCommittedAndApply { .. } => CommandKind::StateMachine,
Command::StateMachine { .. } => CommandKind::StateMachine,
}
}
#[rustfmt::skip]
pub(crate) fn condition(&self) -> Option<Condition<C>> {
match self {
Command::CloseReplicationStreams => None,
Command::RebuildReplicationStreams { .. } => None,
Command::Respond { when, .. } => when.clone(),
Command::UpdateIOProgress { when, .. } => when.clone(),
Command::AppendEntries { .. } => None,
Command::SaveVote { .. } => None,
Command::TruncateLog { .. } => None,
Command::PurgeLog { upto } => Some(Condition::Snapshot { log_id: upto.clone() }),
Command::ReplicateCommitted { .. } => None,
Command::BroadcastHeartbeat { .. } => None,
Command::Replicate { .. } => None,
Command::ReplicateSnapshot { .. } => None,
Command::BroadcastTransferLeader { .. } => None,
Command::SendVote { .. } => None,
Command::SaveCommittedAndApply { .. } => None,
Command::StateMachine { .. } => None,
}
}
}
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) enum Condition<C>
where C: RaftTypeConfig
{
IOFlushed { io_id: IOId<C> },
#[allow(dead_code)]
LogFlushed { log_id: LogIdOf<C> },
#[allow(dead_code)]
Applied { log_id: LogIdOf<C> },
Snapshot { log_id: LogIdOf<C> },
}
impl<C> fmt::Display for Condition<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Condition::IOFlushed { io_id } => {
write!(f, "IOFlushed >= {}", io_id)
}
Condition::LogFlushed { log_id } => {
write!(f, "LogFlushed >= {}", log_id)
}
Condition::Applied { log_id } => write!(f, "Applied >= {}", log_id),
Condition::Snapshot { log_id } => write!(f, "Snapshot >= {}", log_id),
}
}
}
impl<C> Condition<C>
where C: RaftTypeConfig
{
pub(crate) fn is_met(&self, io_state: &IOState<C>) -> bool {
match self {
Condition::IOFlushed { io_id } => self.is_satisfied_by(io_id, io_state.log_progress.flushed()),
Condition::LogFlushed { log_id } => self.is_satisfied_by(
log_id,
io_state.log_progress.flushed().and_then(|io_id| io_id.last_log_id()),
),
Condition::Applied { log_id } => self.is_satisfied_by(log_id, io_state.apply_progress.flushed()),
Condition::Snapshot { log_id } => self.is_satisfied_by(log_id, io_state.snapshot.flushed()),
}
}
fn is_satisfied_by<T>(&self, expected: &T, actual: Option<&T>) -> bool
where T: PartialOrd + fmt::Display {
let Some(actual) = actual else {
tracing::debug!("{} is not met: actual: None", self);
return false;
};
if actual >= expected {
tracing::debug!("{} is met: actual: {}", self, actual);
true
} else {
tracing::debug!("{} is not met: actual: {}", self, actual);
false
}
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(derive_more::From)]
pub(crate) enum Respond<C>
where C: RaftTypeConfig
{
Vote(ValueSender<C, VoteResponse<C>>),
AppendEntries(ValueSender<C, StreamAppendResult<C>>),
ReceiveSnapshotChunk(ValueSender<C, Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<C, Result<InstallSnapshotResponse<C>, InstallSnapshotError>>),
InstallFullSnapshot(ValueSender<C, SnapshotResponse<C>>),
Initialize(ValueSender<C, Result<(), InitializeError<C>>>),
}
impl<C> fmt::Display for Respond<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Respond::Vote(vs) => write!(f, "Vote {}", vs.value()),
Respond::AppendEntries(vs) => match vs.value() {
Ok(log_id) => write!(f, "AppendEntries Ok({})", log_id.display()),
Err(e) => write!(f, "AppendEntries Err({})", e),
},
Respond::ReceiveSnapshotChunk(vs) => {
write!(
f,
"ReceiveSnapshotChunk {}",
vs.value().as_ref().map(|_x| "()").display()
)
}
Respond::InstallSnapshot(vs) => write!(f, "InstallSnapshot {}", vs.value().display()),
Respond::InstallFullSnapshot(vs) => write!(f, "InstallFullSnapshot {}", vs.value()),
Respond::Initialize(vs) => write!(f, "Initialize {}", vs.value().as_ref().map(|_x| "()").display()),
}
}
}
impl<C> Respond<C>
where C: RaftTypeConfig
{
pub(crate) fn new<T>(res: T, tx: OneshotSenderOf<C, T>) -> Self
where
T: Debug + PartialEq + Eq + OptionalSend,
Self: From<ValueSender<C, T>>,
{
Respond::from(ValueSender::new(res, tx))
}
pub(crate) fn send(self) {
match self {
Respond::Vote(x) => x.send(),
Respond::AppendEntries(x) => x.send(),
Respond::ReceiveSnapshotChunk(x) => x.send(),
Respond::InstallSnapshot(x) => x.send(),
Respond::InstallFullSnapshot(x) => x.send(),
Respond::Initialize(x) => x.send(),
}
}
}
pub(crate) struct ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
value: T,
tx: OneshotSenderOf<C, T>,
}
impl<C, T> Debug for ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ValueSender").field("value", &self.value).finish()
}
}
impl<C, T> PartialEq for ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
fn eq(&self, other: &Self) -> bool {
self.value == other.value
}
}
impl<C, T> Eq for ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
}
impl<C, T> ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
pub(crate) fn new(res: T, tx: OneshotSenderOf<C, T>) -> Self {
Self { value: res, tx }
}
pub(crate) fn value(&self) -> &T {
&self.value
}
pub(crate) fn send(self) {
self.tx.send(self.value).ok();
}
}