pub struct RaftNode {
pub dynamic_config: Arc<RwLock<DynamicConfig>>,
pub failover_coordinator: Arc<RwLock<FailoverCoordinator>>,
/* private fields */
}Expand description
A Raft consensus node
Fields§
§dynamic_config: Arc<RwLock<DynamicConfig>>Hot-reloadable configuration subset.
The Raft event loop reads heartbeat_interval_ms and
compaction_threshold from this value so that operator changes take
effect on the next tick without a full restart.
Fields that require restart (bind_addr, node_id, peers, …) are NOT
present here; see crate::config::NodeConfig for documentation on
which fields fall into each category.
failover_coordinator: Arc<RwLock<FailoverCoordinator>>Failover coordinator for leader failure detection and client redirects.
Tracks which node is the current leader and detects leader failures via heartbeat timeout. The coordinator is seeded with the cluster peers at construction time.
Implementations§
Source§impl RaftNode
impl RaftNode
Sourcepub fn new(config: RaftConfig) -> RaftResult<Self>
pub fn new(config: RaftConfig) -> RaftResult<Self>
Create a new Raft node
Sourcepub fn with_persistence(
config: RaftConfig,
persistence: Arc<dyn RaftPersistence>,
) -> RaftResult<Self>
pub fn with_persistence( config: RaftConfig, persistence: Arc<dyn RaftPersistence>, ) -> RaftResult<Self>
Create a new Raft node with an explicit persistence backend.
Recovers state from the given persistence backend and uses it for all subsequent state and log mutations.
Sourcepub fn current_term(&self) -> Term
pub fn current_term(&self) -> Term
Get the current term
Sourcepub fn commit_index(&self) -> LogIndex
pub fn commit_index(&self) -> LogIndex
Get the commit index
Sourcepub fn last_log_index(&self) -> LogIndex
pub fn last_log_index(&self) -> LogIndex
Get the last log index
Sourcepub fn propose(&self, command: Command) -> RaftResult<LogIndex>
pub fn propose(&self, command: Command) -> RaftResult<LogIndex>
Append a command to the log (leader only)
Sourcepub fn is_recovering(&self) -> bool
pub fn is_recovering(&self) -> bool
Return true if the node is currently replaying its WAL on startup.
Sourcepub fn handle_request_vote(
&self,
req: RequestVoteRequest,
) -> RequestVoteResponse
pub fn handle_request_vote( &self, req: RequestVoteRequest, ) -> RequestVoteResponse
Handle a RequestVote RPC
Sourcepub fn handle_append_entries(
&self,
req: AppendEntriesRequest,
) -> AppendEntriesResponse
pub fn handle_append_entries( &self, req: AppendEntriesRequest, ) -> AppendEntriesResponse
Handle an AppendEntries RPC
Sourcepub fn start_election(&self) -> Vec<RequestVoteRequest>
pub fn start_election(&self) -> Vec<RequestVoteRequest>
Start an election (transition to candidate)
Sourcepub fn handle_vote_response(
&self,
from: NodeId,
resp: RequestVoteResponse,
) -> bool
pub fn handle_vote_response( &self, from: NodeId, resp: RequestVoteResponse, ) -> bool
Handle a vote response during election
Sourcepub fn issue_fencing_token(&self) -> Option<FencingToken>
pub fn issue_fencing_token(&self) -> Option<FencingToken>
Issue a new fencing token for the current leader term.
Returns None if the node is not the current leader.
Sourcepub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()>
pub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()>
Validate that token is not stale relative to the current Raft term.
Returns Ok(()) if the token’s term matches the current Raft term.
Returns Err(RaftError::StaleTerm) if the token predates the current term.
Sourcepub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)>
pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)>
Create heartbeat messages for all peers
Sourcepub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)>
pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)>
Create replication messages for all peers
Sourcepub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)>
pub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)>
Replicate log entries to all followers that need them.
This is a convenience method that combines create_replication_requests()
with the information the caller needs to actually send the RPCs.
Returns a list of (peer_id, request) pairs. If a follower is fully
caught up (its next_index > last_log_index), it is omitted – use
create_heartbeats() for idle keep-alive messages.
Typical usage in a replication loop:
let requests = leader.replicate_to_followers();
for (peer, req) in requests {
let resp = rpc_send(peer, req);
leader.handle_replication_response(peer, resp)?;
}Sourcepub fn create_replication_request_for(
&self,
peer: NodeId,
) -> Option<AppendEntriesRequest>
pub fn create_replication_request_for( &self, peer: NodeId, ) -> Option<AppendEntriesRequest>
Create an AppendEntries request for a specific follower.
Returns None if this node is not the leader, if the follower is
already fully caught up, or if leader state is unavailable.
Sourcepub fn handle_replication_response(
&self,
from: NodeId,
resp: AppendEntriesResponse,
) -> RaftResult<()>
pub fn handle_replication_response( &self, from: NodeId, resp: AppendEntriesResponse, ) -> RaftResult<()>
Handle a replication response
Sourcepub fn maybe_create_snapshot(
&self,
state_machine_data: Vec<u8>,
) -> RaftResult<bool>
pub fn maybe_create_snapshot( &self, state_machine_data: Vec<u8>, ) -> RaftResult<bool>
Attempt to create a snapshot if the log has grown past the threshold
Call this after advancing the commit index. If a snapshot is created, the log is compacted up to the snapshot point.
state_machine_data is the serialized state of the application state machine
at the current applied index.
Sourcepub fn auto_snapshot_if_needed<F>(
&self,
policy: &SnapshotPolicy,
state_machine_data_fn: F,
) -> RaftResult<bool>
pub fn auto_snapshot_if_needed<F>( &self, policy: &SnapshotPolicy, state_machine_data_fn: F, ) -> RaftResult<bool>
Automatically create a snapshot if the log has grown past the configured threshold.
Unlike maybe_create_snapshot, this method uses a SnapshotPolicy to decide
whether to snapshot and takes a closure that produces state machine data on demand
(avoiding the cost of serialization when no snapshot is needed).
Call this after applying committed entries.
Sourcepub fn handle_install_snapshot(
&self,
req: InstallSnapshotRequest,
) -> RaftResult<InstallSnapshotResponse>
pub fn handle_install_snapshot( &self, req: InstallSnapshotRequest, ) -> RaftResult<InstallSnapshotResponse>
Handle an InstallSnapshot RPC from the leader
Used when a follower is too far behind and the leader sends a snapshot instead of individual log entries.
Sourcepub fn prepare_install_snapshot(
&self,
target_peer: NodeId,
) -> RaftResult<Option<InstallSnapshotRequest>>
pub fn prepare_install_snapshot( &self, target_peer: NodeId, ) -> RaftResult<Option<InstallSnapshotRequest>>
Prepare an InstallSnapshot request for a follower that is too far behind.
For snapshots at or below snapshot_chunk_threshold_bytes the full data
is loaded into memory and sent as a single done=true request. For
larger snapshots a crate::snapshot::SnapshotStreamer is created (or
resumed) and one chunk is emitted per call; the streamer is automatically
cleaned up once the final chunk is delivered.
Returns Ok(None) when this node is not the leader, when no snapshot
exists, or when the target peer has already caught up past the snapshot
point via normal log replication.
Sourcepub fn follower_needs_snapshot(&self, peer: NodeId) -> bool
pub fn follower_needs_snapshot(&self, peer: NodeId) -> bool
Check if a follower needs a snapshot instead of normal log replication
Returns true if the follower’s next_index is at or before the snapshot point.
Sourcepub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()>
pub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()>
Add a node to the cluster via joint consensus.
If this node is the leader the change is proposed immediately. Returns an error if a membership change is already in progress or the node is already a member.
Sourcepub fn remove_node(&self, node_id: NodeId) -> RaftResult<()>
pub fn remove_node(&self, node_id: NodeId) -> RaftResult<()>
Remove a node from the cluster via joint consensus.
If the removed node is this node, it will step down after the configuration change commits.
Sourcepub fn cluster_members(&self) -> Vec<(NodeId, String)>
pub fn cluster_members(&self) -> Vec<(NodeId, String)>
Get the current list of cluster members as (node_id, address) pairs.
Sourcepub fn is_in_joint_consensus(&self) -> bool
pub fn is_in_joint_consensus(&self) -> bool
Check whether the cluster is currently in joint consensus.
Sourcepub fn membership_version(&self) -> u64
pub fn membership_version(&self) -> u64
Get the current membership configuration version.
Sourcepub fn propose_membership_change(
&self,
change: MembershipChange,
) -> RaftResult<()>
pub fn propose_membership_change( &self, change: MembershipChange, ) -> RaftResult<()>
Propose a membership change (leader only).
Implements the simplified Raft joint consensus protocol (Section 6):
- Leader creates joint config C_{old,new} and replicates it.
- Once C_{old,new} is committed the leader creates C_{new}.
- During the joint state, decisions require a majority of both the old and new configurations.
Sourcepub fn commit_membership_change(&self) -> RaftResult<()>
pub fn commit_membership_change(&self) -> RaftResult<()>
Commit the joint consensus transition: move from C_{old,new} to C_{new}.
Call this once the joint config entry has been committed (i.e. acknowledged by a quorum of both old and new configs).
Sourcepub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool
pub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool
Calculate whether a set of nodes forms a quorum under the current membership config (handles both stable and joint states).
Sourcepub fn is_stepping_down(&self) -> bool
pub fn is_stepping_down(&self) -> bool
Check if this node is stepping down (has been removed from the cluster).
Sourcepub fn attach_placement_scheduler(
self: &Arc<Self>,
scheduler: PlacementScheduler,
)
pub fn attach_placement_scheduler( self: &Arc<Self>, scheduler: PlacementScheduler, )
Attach an optional shard placement scheduler.
The scheduler is started in the background via tokio::spawn and its
handle is stored so that step_down can stop it
automatically when this node loses leadership. Subsequent calls replace
the previous handle, stopping the old scheduler first.
Sourcepub fn election_timeout_elapsed(&self) -> bool
pub fn election_timeout_elapsed(&self) -> bool
Check if election timeout has elapsed.
The effective timeout is computed from whichever is larger:
- The static
RaftConfig::random_election_timeout()from the initial node configuration, or 2 × dynamic_config.heartbeat_interval_ms, the Raft-safe lower bound derived from the hot-reloadable heartbeat interval.
This means that updating DynamicConfig::heartbeat_interval_ms at
runtime also extends (or contracts) the election timeout proportionally,
so the invariant election_timeout ≥ 2 × heartbeat_interval is always
maintained even after a hot-reload.
Sourcepub fn reset_election_timer(&self)
pub fn reset_election_timer(&self)
Reset election timer
Sourcepub fn get_leader_hint(&self) -> Option<NodeId>
pub fn get_leader_hint(&self) -> Option<NodeId>
Get a hint about the current leader (for client redirection)
Sourcepub fn get_dynamic_config(&self) -> DynamicConfig
pub fn get_dynamic_config(&self) -> DynamicConfig
Return a clone of the current dynamic configuration.
Sourcepub fn update_dynamic_config(&self, new_config: DynamicConfig)
pub fn update_dynamic_config(&self, new_config: DynamicConfig)
Hot-reload the dynamic configuration fields.
This is the primary mechanism for applying operator changes (e.g. via
SIGHUP or an admin RPC) without restarting the node. Only the fields
in DynamicConfig are updated; fields that require a full restart
(bind_addr, node_id, peers, …) cannot be changed here.
The new values take effect on the next Raft event-loop tick —
typically within one heartbeat_interval_ms.
Sourcepub fn set_state_machine(
&self,
sm: impl StateMachine + 'static,
) -> RaftResult<()>
pub fn set_state_machine( &self, sm: impl StateMachine + 'static, ) -> RaftResult<()>
Register a StateMachine to receive committed log entries.
After this call every committed entry that has not yet been applied will be fed to the state machine the next time the commit index advances. Only one state machine can be active at a time; calling this method again replaces the previous one.
Sourcepub fn set_alert_manager(&mut self, am: Arc<AlertManager>)
pub fn set_alert_manager(&mut self, am: Arc<AlertManager>)
Attach an AlertManager to this node.
After this call the node will emit AlertEvents to the manager on
significant state changes (leader election, slow replication, etc.).
Only one manager can be active at a time; calling this again replaces
the previous one.
Sourcepub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest>
pub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest>
Trigger a failover election if this node is a follower. Returns the vote requests to send to peers, or an empty vec if the node is not in follower state.
Auto Trait Implementations§
impl !RefUnwindSafe for RaftNode
impl !UnwindSafe for RaftNode
impl Freeze for RaftNode
impl Send for RaftNode
impl Sync for RaftNode
impl Unpin for RaftNode
impl UnsafeUnpin for RaftNode
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<A, B, T> HttpServerConnExec<A, B> for Twhere
B: Body,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.