openraft 0.10.0-alpha.18

Advanced Raft consensus
Documentation
use std::collections::BTreeMap;
use std::fmt;

use crate::ChangeMembers;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::base::BoxOnce;
use crate::core::raft_msg::external_command::ExternalCommand;
#[cfg(feature = "runtime-stats")]
use crate::core::runtime_stats::RuntimeStats;
use crate::display_ext::DisplayBTreeMapDebugValueExt;
use crate::errors::Infallible;
use crate::errors::InitializeError;
use crate::errors::LinearizableReadError;
use crate::impls::ProgressResponder;
use crate::raft::AppendEntriesRequest;
use crate::raft::ClientWriteResult;
use crate::raft::ReadPolicy;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::linearizable_read::Linearizer;
use crate::raft::responder::core_responder::CoreResponder;
use crate::raft::stream_append::StreamAppendResult;
use crate::type_config::alias::BatchOf;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::EntryPayloadOf;
#[cfg(feature = "runtime-stats")]
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::VoteOf;

pub(crate) mod external_command;
mod raft_msg_name;

pub use raft_msg_name::ExternalCommandName;
pub use raft_msg_name::RaftMsgName;

/// A oneshot TX to send result from `RaftCore` to external caller, e.g. `Raft::append_entries`.
pub(crate) type ResultSender<C, T, E = Infallible> = OneshotSenderOf<C, Result<T, E>>;

/// TX for Vote Response
pub(crate) type VoteTx<C> = OneshotSenderOf<C, VoteResponse<C>>;

/// TX for Append Entries Response
pub(crate) type AppendEntriesTx<C> = OneshotSenderOf<C, StreamAppendResult<C>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> = ResultSender<C, Linearizer<C>, LinearizableReadError<C>>;

/// A message sent by application to the [`RaftCore`].
///
/// [`RaftCore`]: crate::core::RaftCore
pub(crate) enum RaftMsg<C>
where C: RaftTypeConfig
{
    AppendEntries {
        rpc: AppendEntriesRequest<C>,
        tx: AppendEntriesTx<C>,
    },

    RequestVote {
        rpc: VoteRequest<C>,
        tx: VoteTx<C>,
    },

    InstallSnapshot {
        vote: VoteOf<C>,
        snapshot: SnapshotOf<C>,
        tx: OneshotSenderOf<C, SnapshotResponse<C>>,
    },

    /// Begin receiving a snapshot from the leader.
    ///
    /// Returns a snapshot data handle for receiving data.
    ///
    /// It does not check `Vote` because it is a read operation
    /// and does not break raft protocol.
    GetSnapshotReceiver {
        tx: OneshotSenderOf<C, SnapshotDataOf<C>>,
    },

    ClientWrite {
        payloads: BatchOf<C, EntryPayloadOf<C>>,
        responders: BatchOf<C, Option<CoreResponder<C>>>,
        expected_leader: Option<CommittedLeaderIdOf<C>>,
        #[cfg(feature = "runtime-stats")]
        proposed_at: InstantOf<C>,
    },

    GetLinearizer {
        read_policy: ReadPolicy,
        tx: ClientReadTx<C>,
    },

    Initialize {
        members: BTreeMap<C::NodeId, C::Node>,
        tx: ResultSender<C, (), InitializeError<C>>,
    },

    ChangeMembership {
        changes: ChangeMembers<C::NodeId, C::Node>,

        /// If `retain` is `true`, then the voters that are not in the new
        /// config will be converted into learners, otherwise they will be removed.
        retain: bool,

        tx: ProgressResponder<C, ClientWriteResult<C>>,
    },

    WithRaftState {
        req: BoxOnce<'static, RaftState<C>>,
    },

    /// Transfer Leader to another node.
    ///
    /// If this node is `to`, reset Leader lease and start election.
    /// Otherwise, just reset Leader lease so that the node `to` can become Leader.
    HandleTransferLeader {
        /// The vote of the Leader that is transferring the leadership.
        from: VoteOf<C>,
        /// The assigned node to be the next Leader.
        to: C::NodeId,
    },

    ExternalCommand {
        cmd: ExternalCommand<C>,
    },

    /// Get runtime statistics from RaftCore.
    ///
    /// Returns a copy of the current runtime stats.
    #[cfg(feature = "runtime-stats")]
    GetRuntimeStats {
        tx: OneshotSenderOf<C, RuntimeStats<C>>,
    },
}

impl<C: RaftTypeConfig> RaftMsg<C> {
    /// Returns the name of this message variant.
    pub fn name(&self) -> RaftMsgName {
        match self {
            RaftMsg::AppendEntries { .. } => RaftMsgName::AppendEntries,
            RaftMsg::RequestVote { .. } => RaftMsgName::RequestVote,
            RaftMsg::InstallSnapshot { .. } => RaftMsgName::InstallSnapshot,
            RaftMsg::GetSnapshotReceiver { .. } => RaftMsgName::GetSnapshotReceiver,
            RaftMsg::ClientWrite { .. } => RaftMsgName::ClientWrite,
            RaftMsg::GetLinearizer { .. } => RaftMsgName::GetLinearizer,
            RaftMsg::Initialize { .. } => RaftMsgName::Initialize,
            RaftMsg::ChangeMembership { .. } => RaftMsgName::ChangeMembership,
            RaftMsg::HandleTransferLeader { .. } => RaftMsgName::HandleTransferLeader,
            RaftMsg::WithRaftState { .. } => RaftMsgName::WithRaftState,
            RaftMsg::ExternalCommand { cmd } => RaftMsgName::ExternalCommand(cmd.name()),
            #[cfg(feature = "runtime-stats")]
            RaftMsg::GetRuntimeStats { .. } => RaftMsgName::GetRuntimeStats,
        }
    }
}

impl<C> fmt::Display for RaftMsg<C>
where C: RaftTypeConfig
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            RaftMsg::AppendEntries { rpc, .. } => {
                write!(f, "AppendEntries: {}", rpc)
            }
            RaftMsg::RequestVote { rpc, .. } => {
                write!(f, "RequestVote: {}", rpc)
            }
            RaftMsg::GetSnapshotReceiver { .. } => {
                write!(f, "GetSnapshotReceiver")
            }
            RaftMsg::InstallSnapshot { vote, snapshot, .. } => {
                write!(f, "InstallSnapshot: vote: {}, snapshot: {}", vote, snapshot)
            }
            RaftMsg::ClientWrite { .. } => write!(f, "ClientWrite"),
            RaftMsg::GetLinearizer { read_policy, .. } => {
                write!(f, "GetLinearizer: {}", read_policy)
            }
            RaftMsg::Initialize { members, .. } => {
                write!(f, "Initialize: {}", members.display())
            }
            RaftMsg::ChangeMembership { changes, retain, .. } => {
                write!(f, "ChangeMembership: {}, retain: {}", changes, retain)
            }
            RaftMsg::WithRaftState { .. } => write!(f, "WithRaftState"),
            RaftMsg::HandleTransferLeader { from, to } => {
                write!(f, "TransferLeader: from_leader: vote={}, to: {}", from, to)
            }
            RaftMsg::ExternalCommand { cmd } => {
                write!(f, "ExternalCommand: {}", cmd)
            }
            #[cfg(feature = "runtime-stats")]
            RaftMsg::GetRuntimeStats { .. } => {
                write!(f, "GetRuntimeStats")
            }
        }
    }
}