pub struct Raft<C>where
C: RaftTypeConfig,{ /* private fields */ }
Expand description
The Raft API.
This type implements the full Raft spec, and is the interface to a running Raft node. Applications building on top of Raft will use this to spawn a Raft task and interact with the spawned task.
For more information on the Raft protocol, see the specification here (pdf warning).
§Clone
This type implements Clone
, and cloning itself is very cheap and helps to facilitate use with
async workflows.
§Shutting down
If any of the interfaces returns a RaftError::Fatal
, this indicates that the Raft node
is shutting down. If the parent application needs to shutdown the Raft node for any reason,
calling shutdown
will do the trick.
Implementations§
source§impl<C> Raft<C>where
C: RaftTypeConfig<Responder = OneshotResponder<C>>,
impl<C> Raft<C>where
C: RaftTypeConfig<Responder = OneshotResponder<C>>,
Implement blocking mode write operations those reply on oneshot channel for communication between Raft core and client.
sourcepub async fn change_membership(
&self,
members: impl Into<ChangeMembers<C::NodeId, C::Node>>,
retain: bool
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
pub async fn change_membership( &self, members: impl Into<ChangeMembers<C::NodeId, C::Node>>, retain: bool ) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
Propose a cluster configuration change.
A node in the proposed config has to be a learner, otherwise it fails with LearnerNotFound error.
Internally:
- It proposes a joint config.
- When the joint config is committed, it proposes a uniform config.
If retain
is true, then all the members which not exists in the new membership,
will be turned into learners, otherwise will be removed.
Example of retain
usage:
If the original membership is {“voter”:{1,2,3}, “learners”:{}}, and call
change_membership
with voters
{3,4,5}, then:
- If
retain
istrue
, the committed new membership is {“voters”:{3,4,5}, “learners”:{1,2}}. - Otherwise if
retain
isfalse
, then the new membership is {“voters”:{3,4,5}, “learners”:{}}, in which the voters not exists in the new membership just be removed from the cluster.
If it loses leadership or crashed before committing the second uniform config log, the cluster is left in the joint config.
sourcepub async fn add_learner(
&self,
id: C::NodeId,
node: C::Node,
blocking: bool
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
pub async fn add_learner( &self, id: C::NodeId, node: C::Node, blocking: bool ) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>
Add a new learner raft node, optionally, blocking until up-to-speed.
- Add a node as learner into the cluster.
- Setup replication from leader to it.
If blocking
is true
, this function blocks until the leader believes the logs on the new
node is up to date, i.e., ready to join the cluster, as a voter, by calling
change_membership
.
If blocking is false
, this function returns at once as successfully setting up the
replication.
If the node to add is already a voter or learner, it will still re-add it.
A node
is able to store the network address of a node. Thus an application does not
need another store for mapping node-id to ip-addr when implementing the RaftNetwork.
source§impl<C> Raft<C>where
C: RaftTypeConfig,
impl<C> Raft<C>where
C: RaftTypeConfig,
sourcepub async fn new<LS, N, SM>(
id: C::NodeId,
config: Arc<Config>,
network: N,
log_store: LS,
state_machine: SM
) -> Result<Self, Fatal<C::NodeId>>
pub async fn new<LS, N, SM>( id: C::NodeId, config: Arc<Config>, network: N, log_store: LS, state_machine: SM ) -> Result<Self, Fatal<C::NodeId>>
Create and spawn a new Raft task.
§id
The ID which the spawned Raft task will use to identify itself within the cluster. Applications must guarantee that the ID provided to this function is stable, and should be persisted in a well known location, probably alongside the Raft log and the application’s state machine. This ensures that restarts of the node will yield the same ID every time.
§config
Raft’s runtime config. See the docs on the Config
object for more details.
§network
An implementation of the RaftNetworkFactory
trait which will be used by Raft for sending
RPCs to peer nodes within the cluster. See the docs on the RaftNetworkFactory
trait
for more details.
§storage
An implementation of the RaftStorage
trait which will be used by Raft for data storage.
See the docs on the RaftStorage
trait for more details.
sourcepub fn runtime_config(&self) -> RuntimeConfigHandle<'_, C>
pub fn runtime_config(&self) -> RuntimeConfigHandle<'_, C>
Return a handle to update runtime config.
Such enabling/disabling heartbeat, election, etc.
Example:
let raft = Raft::new(...).await?;
raft.runtime_config().heartbeat(true);
sourcepub fn enable_tick(&self, enabled: bool)
👎Deprecated since 0.8.4: use Raft::runtime_config().tick()
instead
pub fn enable_tick(&self, enabled: bool)
Raft::runtime_config().tick()
insteadEnable or disable raft internal ticker.
pub fn enable_heartbeat(&self, enabled: bool)
Raft::runtime_config().heartbeat()
insteadpub fn enable_elect(&self, enabled: bool)
Raft::runtime_config().elect()
insteadsourcepub fn trigger(&self) -> Trigger<'_, C>
pub fn trigger(&self) -> Trigger<'_, C>
Return a handle to manually trigger raft actions, such as elect or build snapshot.
Example:
let raft = Raft::new(...).await?;
raft.trigger().elect().await?;
sourcepub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>>
👎Deprecated since 0.8.4: use Raft::trigger().elect()
instead
pub async fn trigger_elect(&self) -> Result<(), Fatal<C::NodeId>>
Raft::trigger().elect()
insteadTrigger election at once and return at once.
sourcepub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>>
👎Deprecated since 0.8.4: use Raft::trigger().heartbeat()
instead
pub async fn trigger_heartbeat(&self) -> Result<(), Fatal<C::NodeId>>
Raft::trigger().heartbeat()
insteadTrigger a heartbeat at once and return at once.
sourcepub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>>
👎Deprecated since 0.8.4: use Raft::trigger().snapshot()
instead
pub async fn trigger_snapshot(&self) -> Result<(), Fatal<C::NodeId>>
Raft::trigger().snapshot()
insteadTrigger to build a snapshot at once and return at once.
sourcepub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>>
👎Deprecated since 0.8.4: use Raft::trigger().purge_log()
instead
pub async fn purge_log(&self, upto: u64) -> Result<(), Fatal<C::NodeId>>
Raft::trigger().purge_log()
insteadInitiate the log purge up to and including the given upto
log index.
sourcepub async fn append_entries(
&self,
rpc: AppendEntriesRequest<C>
) -> Result<AppendEntriesResponse<C::NodeId>, RaftError<C::NodeId>>
pub async fn append_entries( &self, rpc: AppendEntriesRequest<C> ) -> Result<AppendEntriesResponse<C::NodeId>, RaftError<C::NodeId>>
Submit an AppendEntries RPC to this Raft node.
These RPCs are sent by the cluster leader to replicate log entries (§5.3), and are also used as heartbeats (§5.2).
sourcepub async fn vote(
&self,
rpc: VoteRequest<C::NodeId>
) -> Result<VoteResponse<C::NodeId>, RaftError<C::NodeId>>
pub async fn vote( &self, rpc: VoteRequest<C::NodeId> ) -> Result<VoteResponse<C::NodeId>, RaftError<C::NodeId>>
Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node.
These RPCs are sent by cluster peers which are in candidate state attempting to gather votes (§5.2).
sourcepub async fn get_snapshot(
&self
) -> Result<Option<Snapshot<C>>, RaftError<C::NodeId>>
pub async fn get_snapshot( &self ) -> Result<Option<Snapshot<C>>, RaftError<C::NodeId>>
Get the latest snapshot from the state machine.
It returns error only when RaftCore
fails to serve the request, e.g., Encountering a
storage error or shutting down.
sourcepub async fn begin_receiving_snapshot(
&self
) -> Result<Box<<C as RaftTypeConfig>::SnapshotData>, RaftError<C::NodeId, Infallible>>
pub async fn begin_receiving_snapshot( &self ) -> Result<Box<<C as RaftTypeConfig>::SnapshotData>, RaftError<C::NodeId, Infallible>>
Get a snapshot data for receiving snapshot from the leader.
sourcepub async fn install_full_snapshot(
&self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>
) -> Result<SnapshotResponse<C::NodeId>, Fatal<C::NodeId>>
pub async fn install_full_snapshot( &self, vote: Vote<C::NodeId>, snapshot: Snapshot<C> ) -> Result<SnapshotResponse<C::NodeId>, Fatal<C::NodeId>>
Install a completely received snapshot to the state machine.
This method is used to implement a totally application defined snapshot transmission. The application receives a snapshot from the leader, in chunks or a stream, and then rebuild a snapshot, then pass the snapshot to Raft to install.
sourcepub async fn install_snapshot(
&self,
req: InstallSnapshotRequest<C>
) -> Result<InstallSnapshotResponse<C::NodeId>, RaftError<C::NodeId, InstallSnapshotError>>
👎Deprecated since 0.9.0: with generic-snapshot-shot
enabled, use Raft::install_full_snapshot()
instead
pub async fn install_snapshot( &self, req: InstallSnapshotRequest<C> ) -> Result<InstallSnapshotResponse<C::NodeId>, RaftError<C::NodeId, InstallSnapshotError>>
generic-snapshot-shot
enabled, use Raft::install_full_snapshot()
insteadReceive an InstallSnapshotRequest
.
These RPCs are sent by the cluster leader in order to bring a new node or a slow node up-to-speed with the leader.
If receiving is finished done == true
, it installs the snapshot to the state machine.
Nothing will be done if the input snapshot is older than the state machine.
sourcepub async fn current_leader(&self) -> Option<C::NodeId>
pub async fn current_leader(&self) -> Option<C::NodeId>
Get the ID of the current leader from this Raft node.
This method is based on the Raft metrics system which does a good job at staying
up-to-date; however, the is_leader
method must still be used to guard against stale
reads. This method is perfect for making decisions on where to route client requests.
sourcepub async fn is_leader(
&self
) -> Result<(), RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
👎Deprecated since 0.9.0: use Raft::ensure_linearizable()
instead
pub async fn is_leader( &self ) -> Result<(), RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
Raft::ensure_linearizable()
insteadCheck to ensure this node is still the cluster leader, in order to guard against stale reads (§8).
The actual read operation itself is up to the application, this method just ensures that the read will not be stale.
sourcepub async fn ensure_linearizable(
&self
) -> Result<Option<LogId<C::NodeId>>, RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
pub async fn ensure_linearizable( &self ) -> Result<Option<LogId<C::NodeId>>, RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
Ensures a read operation performed following this method are linearizable across the cluster.
This method is just a shorthand for calling get_read_log_id()
and
then calling Raft::wait.
This method confirms the node’s leadership at the time of invocation by sending heartbeats to a quorum of followers, and the state machine is up to date. This method blocks until all these conditions are met.
Returns:
Ok(read_log_id)
on successful confirmation that the node is the leader.read_log_id
represents the log id up to which the state machine has applied to ensure a linearizable read.Err(RaftError<CheckIsLeaderError>)
if it detects a higher term, or if it fails to communicate with a quorum of followers.
§Examples
my_raft.ensure_linearizable().await?;
// Proceed with the state machine read
Read more about how it works: Read Operation
sourcepub async fn get_read_log_id(
&self
) -> Result<(Option<LogId<C::NodeId>>, Option<LogId<C::NodeId>>), RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
pub async fn get_read_log_id( &self ) -> Result<(Option<LogId<C::NodeId>>, Option<LogId<C::NodeId>>), RaftError<C::NodeId, CheckIsLeaderError<C::NodeId, C::Node>>>
Ensures this node is leader and returns the log id up to which the state machine should apply to ensure a read can be linearizable across the cluster.
The leadership is ensured by sending heartbeats to a quorum of followers.
Note that this is just the first step for linearizable read. The second step is to wait for
state machine to reach the returned read_log_id
.
Returns:
Ok((read_log_id, last_applied_log_id))
on successful confirmation that the node is the leader.read_log_id
represents the log id up to which the state machine should apply to ensure a linearizable read.Err(RaftError<CheckIsLeaderError>)
if it detects a higher term, or if it fails to communicate with a quorum of followers.
The caller should then wait for last_applied_log_id
to catch up, which can be done by
subscribing to Raft::metrics
and waiting for last_applied_log_id
to
reach read_log_id
.
§Examples
let (read_log_id, applied_log_id) = my_raft.get_read_log_id().await?;
if read_log_id.index() > applied_log_id.index() {
my_raft.wait(None).applied_index_at_least(read_log_id.index()).await?;
}
// Proceed with the state machine read
The comparison read_log_id > applied_log_id
would also be valid in the above example.
See: Read Operation
sourcepub async fn client_write<E>(
&self,
app_data: C::D
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>where
<<C as RaftTypeConfig>::Responder as Responder<C>>::Receiver: Future<Output = Result<ClientWriteResult<C>, E>>,
E: Error + OptionalSend,
pub async fn client_write<E>(
&self,
app_data: C::D
) -> Result<ClientWriteResponse<C>, RaftError<C::NodeId, ClientWriteError<C::NodeId, C::Node>>>where
<<C as RaftTypeConfig>::Responder as Responder<C>>::Receiver: Future<Output = Result<ClientWriteResult<C>, E>>,
E: Error + OptionalSend,
Submit a mutating client request to Raft to update the state of the system (§5.1).
It will be appended to the log, committed to the cluster, and then applied to the application state machine. The result of applying the request to the state machine will be returned as the response from this method.
Our goal for Raft is to implement linearizable semantics. If the leader crashes after
committing a log entry but before responding to the client, the client may retry the
command with a new leader, causing it to be executed a second time. As such, clients
should assign unique serial numbers to every command. Then, the state machine should
track the latest serial number processed for each client, along with the associated
response. If it receives a command whose serial number has already been executed, it
responds immediately without re-executing the request (§8). The
RaftStorage::apply_entry_to_state_machine
method is the perfect place to implement
this.
These are application specific requirements, and must be implemented by the application which is being built on top of Raft.
sourcepub async fn client_write_ff(
&self,
app_data: C::D
) -> Result<<<C as RaftTypeConfig>::Responder as Responder<C>>::Receiver, Fatal<C::NodeId>>
pub async fn client_write_ff( &self, app_data: C::D ) -> Result<<<C as RaftTypeConfig>::Responder as Responder<C>>::Receiver, Fatal<C::NodeId>>
Submit a mutating client request to Raft to update the state machine, returns an application
defined response receiver Responder::Receiver
.
_ff
means fire and forget.
It is same as Raft::client_write
but does not wait for the response.
sourcepub async fn is_initialized(&self) -> Result<bool, Fatal<C::NodeId>>
pub async fn is_initialized(&self) -> Result<bool, Fatal<C::NodeId>>
Return true
if this node is already initialized and can not be initialized again with
Raft::initialize
sourcepub async fn initialize<T>(
&self,
members: T
) -> Result<(), RaftError<C::NodeId, InitializeError<C::NodeId, C::Node>>>
pub async fn initialize<T>( &self, members: T ) -> Result<(), RaftError<C::NodeId, InitializeError<C::NodeId, C::Node>>>
Initialize a pristine Raft node with the given config.
This command should be called on pristine nodes — where the log index is 0 and the node is
in Learner state — as if either of those constraints are false, it indicates that the
cluster is already formed and in motion. If InitializeError::NotAllowed
is returned
from this function, it is safe to ignore, as it simply indicates that the cluster is
already up and running, which is ultimately the goal of this function.
This command will work for single-node or multi-node cluster formation. This command should be called with all discovered nodes which need to be part of cluster, and as such it is recommended that applications be configured with an initial cluster formation delay which will allow time for the initial members of the cluster to be discovered (by the parent application) for this call.
Once a node successfully initialized it will commit a new membership config log entry to store. Then it starts to work, i.e., entering Candidate state and try electing itself as the leader.
More than one node performing initialize()
with the same config is safe,
with different config will result in split brain condition.
sourcepub async fn with_raft_state<F, V>(
&self,
func: F
) -> Result<V, Fatal<C::NodeId>>where
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) -> V + OptionalSend + 'static,
V: OptionalSend + 'static,
pub async fn with_raft_state<F, V>(
&self,
func: F
) -> Result<V, Fatal<C::NodeId>>where
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) -> V + OptionalSend + 'static,
V: OptionalSend + 'static,
Provides read-only access to RaftState
through a user-provided function.
The function func
is applied to the current RaftState
. The result of this function,
of type V
, is returned wrapped in Result<V, Fatal<C::NodeId>>
. Fatal
error will be
returned if failed to receive a reply from RaftCore
.
A Fatal
error is returned if:
- Raft core task is stopped normally.
- Raft core task is panicked due to programming error.
- Raft core task is encountered a storage error.
Example for getting the current committed log id:
let committed = my_raft.with_raft_state(|st| st.committed).await?;
sourcepub fn external_request<F>(&self, req: F)where
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) + OptionalSend + 'static,
pub fn external_request<F>(&self, req: F)where
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>) + OptionalSend + 'static,
Send a request to the Raft core loop in a fire-and-forget manner.
The request functor will be called with a mutable reference to both the state machine and the network factory and serialized with other Raft core loop processing (e.g., client requests or general state changes). The current state of the system is passed as well.
If a response is required, then the caller can store the sender of a one-shot channel in the closure of the request functor, which can then be used to send the response asynchronously.
If the API channel is already closed (Raft is in shutdown), then the request functor is destroyed right away and not called at all.
sourcepub fn metrics(&self) -> Receiver<RaftMetrics<C::NodeId, C::Node>>
pub fn metrics(&self) -> Receiver<RaftMetrics<C::NodeId, C::Node>>
Get a handle to the metrics channel.
sourcepub fn data_metrics(&self) -> Receiver<RaftDataMetrics<C::NodeId>>
pub fn data_metrics(&self) -> Receiver<RaftDataMetrics<C::NodeId>>
Get a handle to the data metrics channel.
sourcepub fn server_metrics(&self) -> Receiver<RaftServerMetrics<C::NodeId, C::Node>>
pub fn server_metrics(&self) -> Receiver<RaftServerMetrics<C::NodeId, C::Node>>
Get a handle to the server metrics channel.
sourcepub fn wait(
&self,
timeout: Option<Duration>
) -> Wait<C::NodeId, C::Node, C::AsyncRuntime>
pub fn wait( &self, timeout: Option<Duration> ) -> Wait<C::NodeId, C::Node, C::AsyncRuntime>
Get a handle to wait for the metrics to satisfy some condition.
If timeout
is None
, then it will wait forever(10 years).
If timeout
is Some
, then it will wait for the specified duration.
let timeout = Duration::from_millis(200);
// wait for raft log-3 to be received and applied:
r.wait(Some(timeout)).log(Some(3), "log").await?;
// wait for ever for raft node's current leader to become 3:
r.wait(None).current_leader(2, "wait for leader").await?;
// wait for raft state to become a follower
r.wait(None).state(State::Follower, "state").await?;
sourcepub async fn shutdown(
&self
) -> Result<(), <C::AsyncRuntime as AsyncRuntime>::JoinError>
pub async fn shutdown( &self ) -> Result<(), <C::AsyncRuntime as AsyncRuntime>::JoinError>
Shutdown this Raft node.
It sends a shutdown signal and waits until RaftCore
returns.