pub struct Node { /* private fields */ }Expand description
Raft node.
Implementations§
Source§impl Node
impl Node
Sourcepub fn start(id: NodeId) -> Self
pub fn start(id: NodeId) -> Self
Starts a new node.
To create a new cluster, please call Node::create_cluster() after starting the node.
If the node has already been part of a cluster, please use Node::restart() instead.
§Examples
use raftbare::{LogPosition, Node, NodeId};
// Starts three nodes.
let mut node0 = Node::start(NodeId::new(0));
let node1 = Node::start(NodeId::new(1));
let node2 = Node::start(NodeId::new(2));
for node in [&node0, &node1, &node2] {
assert!(node.role().is_follower());
assert_eq!(node.config().unique_nodes().count(), 0);
assert_eq!(node.log().last_position(), LogPosition::ZERO);
assert!(node.actions().is_empty());
}
// Creates a new cluster.
node0.create_cluster(&[node0.id(), node1.id(), node2.id()]);
assert!(node0.role().is_candidate());
assert_eq!(node0.config().unique_nodes().count(), 3);
assert_ne!(node0.log().last_position(), LogPosition::ZERO);
assert!(!node0.actions().is_empty());
// [NOTE] To complete the cluster creation, the user needs to handle the queued actions.Sourcepub fn restart(
id: NodeId,
current_term: Term,
voted_for: Option<NodeId>,
log: Log,
) -> Self
pub fn restart( id: NodeId, current_term: Term, voted_for: Option<NodeId>, log: Log, ) -> Self
Restarts a node.
current_term, voted_for, and log are restored from persistent storage.
Note that managing the persistent storage is outside the scope of this crate.
§Notes
Raft algorithm assumes the persistent storage is reliable. So, for example, if the local log of the node has corrupted or lost some log tail entries, it is safest to remove the node from the cluster then add it back as a new node.
In practice, such storage failures are usually tolerable when the majority of nodes in the cluster are healthy (i.e., the restarted node can restored its previous state).
But be careful, whether the degraded safety guarantee is acceptable or not highly depends on the application.
§Examples
use raftbare::{Node, NodeId};
// Loads the persistent state.
let current_term = /* ... ; */
let voted_for = /* ... ; */
let log = /* ... ; */
// Restarts a node.
let snapshot_index = log.snapshot_position().index;
let node = Node::restart(NodeId::new(0), current_term, voted_for, log);
assert!(node.role().is_follower());
assert_eq!(node.commit_index(), snapshot_index);
// Unlike `Node::start()`, the restarted node has actions to execute.
assert!(!node.actions().is_empty());Sourcepub fn create_cluster(&mut self, initial_voters: &[NodeId]) -> LogPosition
pub fn create_cluster(&mut self, initial_voters: &[NodeId]) -> LogPosition
Creates a new cluster.
This method returns a LogPosition associated with a log entry.
The log entry will be accepted when the initial cluster configuration is successfully committed.
To proceed the cluster creation, the user needs to handle the queued actions after calling this method.
§Preconditions
This method returns LogPosition::INVALID if the following preconditions are not met:
- This node (
self) is a newly started node. initial_voterscontains at least one node.
Theoretically, it is acceptable to exclude the self node from initial_voters
(although it is not practical).
§Notes
Raft algorithm assumes that each node in a cluster belongs to only one cluster at a time.
Therefore, including nodes that are already part of another cluster in the initial_voters
will result in undefined behavior.
Sourcepub fn current_term(&self) -> Term
pub fn current_term(&self) -> Term
Returns the current term of this node.
Sourcepub fn voted_for(&self) -> Option<NodeId>
pub fn voted_for(&self) -> Option<NodeId>
Returns the identifier of the node for which this node voted in the current term.
If self.role() is not Role::Candidate, the returned node may be the leader of the current term.
Sourcepub fn commit_index(&self) -> LogIndex
pub fn commit_index(&self) -> LogIndex
Returns the commit index of this node.
LogEntry::Command entries up to this index are safely applied to the state machine managed by the user.
Sourcepub fn config(&self) -> &ClusterConfig
pub fn config(&self) -> &ClusterConfig
Returns the current cluster configuration of this node.
This is shorthand for self.log().latest_config().
Sourcepub fn peers(&self) -> impl '_ + Iterator<Item = NodeId>
pub fn peers(&self) -> impl '_ + Iterator<Item = NodeId>
Returns an iterator over the identifiers of the peers of this node.
“Peers” means all unique nodes in the current cluster configuration except for this node.
Sourcepub fn actions_mut(&mut self) -> &mut Actions ⓘ
pub fn actions_mut(&mut self) -> &mut Actions ⓘ
Returns a mutable reference to the pending actions for this node.
§Note
It is the user’s responsibility to execute these actions.
Sourcepub fn propose_command(&mut self) -> LogPosition
pub fn propose_command(&mut self) -> LogPosition
Proposes a user-defined command (LogEntry::Command).
This method returns a LogPosition that associated with the log entry for the proposed command.
To determine whether the command has been committed, you can use the Node::get_commit_status() method.
To known where the command is commited or not, you can use Node::get_commit_status() method.
Committed commands can be applied to the state machine managed by the user.
Node::get_commit_status() is useful for determining when to send the command result back to the client
that triggered the command (if such a client exists).
To detect all committed commands that need to be applied to the state machine,
it is recommended to use Node::commit_index() since it considers commands proposed by other nodes.
Note that this crate does not manage the detail of user-defined commands, so this method takes no arguments. It is the user’s responsibility to mapping the log index of the proposed command to the actual command data.
§Preconditions
This method returns LogPosition::INVALID if the following preconditions are not met:
self.role().is_leader()istrue.
§Pipelining
Node::propose_command() can be called multiple times before any action is executed.
In such cases, the pending actions are consolidated, reducing the overall I/O cost.
§Examples
use raftbare::{LogEntry, LogIndex, NodeId, Node};
let mut node = /* ... ; */
let commit_position = node.propose_command();
if commit_position.is_invalid() {
// `node` is not the leader.
if let Some(maybe_leader) = node.voted_for() {
// Retry with the possible leader or reply to the client that the command is rejected.
// ...
}
return;
}
// Need to map the log index to the actual command data for
// exeucting `Action::AppendLogEntries(_)` queued by the node.
assert!(node.actions().append_log_entries.is_some());
let index = commit_position.index;
// ... executing actions ...
while node.get_commit_status(commit_position).is_in_progress() {
// ... executing actions ...
}
if node.get_commit_status(commit_position).is_rejected() {
// Retry with another node or reply to the client that the command is rejected.
// ...
return;
}
assert!(node.get_commit_status(commit_position).is_committed());
// Apply all committed commands to the state machine.
let last_applied_index = /* ...; */
for index in (last_applied_index.get() - 1)..=node.commit_index().get() {
let index = LogIndex::new(index);
if node.log().entries().get_entry(index) != Some(LogEntry::Command) {
continue;
}
// Apply the command to the state machine.
// ...
if index == commit_position.index {
// Reply to the client that the command is committed.
// ...
}
}Sourcepub fn propose_config(&mut self, new_config: ClusterConfig) -> LogPosition
pub fn propose_config(&mut self, new_config: ClusterConfig) -> LogPosition
Proposes a new cluster configuration (LogEntry::ClusterConfig).
If new_config.new_voters is not empty, the cluster will transition into a joint consensus state.
In this state, leader elections and commit proposals require a majority from both the old and
new voters independently.
Once new_config is committed, a new configuration, which includes only the new voters
(and any non-voters, if any), will be automatically proposed to finalize the joint consensus.
new_config.new_voters does not need to include the self node.
If it does not, the leader self node will transition to a follower
when the final configuration is committed.
Note that a change in new_config.non_voters does not require a joint consensus.
§Preconditions
This method returns LogPosition::INVALID if the following preconditions are not met:
self.role().is_leader()istrue.new_config.votersis equal toself.config().voters.- A node is either a voter or a non-voter in the new configuration (not both).
self.config().is_joint_consensus()isfalse(i.e., there is no other configuration change in progress).
§Examples
use raftbare::NodeId;
let mut node = /* ... ; */
// Propose a new configuration with adding node 4 and removing node 2.
let new_config = node.config().to_joint_consensus(&[NodeId::new(4)], &[NodeId::new(2)]);
node.propose_config(new_config);Sourcepub fn get_commit_status(&self, position: LogPosition) -> CommitStatus
pub fn get_commit_status(&self, position: LogPosition) -> CommitStatus
Returns the commit status of a log entry associated with the given position.
Sourcepub fn heartbeat(&mut self) -> bool
pub fn heartbeat(&mut self) -> bool
Sends a heartbeat (i.e, an empty AppendEntriesCall message) to all followers.
This method returns false if this node is not the leader.
This method can be used to perform consistent queries through the following steps:
- Invoke
heartbeat(). - Record the sequence number from the heartbeat message.
- Wait until this node receives the majority of response messages that are equal to or newer than the sequence number, to confirm that this node is still the leader of the cluster.
- Execute the consistent query.
Sourcepub fn handle_message(&mut self, msg: Message)
pub fn handle_message(&mut self, msg: Message)
Handles an incoming message from other nodes.
§Examples
let mut node = /* ... ; */
let msg = /* ... ; */
node.handle_message(msg);
// Execute actions queued by the message handling.
for action in node.actions_mut() {
// ...
}Sourcepub fn handle_election_timeout(&mut self)
pub fn handle_election_timeout(&mut self)
Handles an election timeout.
This method is typically invoked when the timeout set by Action::SetElectionTimeout expires.
However, it can also be invoked by other means, such as to trigger a new election
as quickly as possible when the crate user knows there is no leader.
§Examples
let mut node = /* ... ; */
node.handle_election_timeout();
// Execute actions queued by the timeout handling.
for action in node.actions_mut() {
// ...
}Sourcepub fn handle_snapshot_installed(
&mut self,
last_included_position: LogPosition,
last_included_config: ClusterConfig,
) -> bool
pub fn handle_snapshot_installed( &mut self, last_included_position: LogPosition, last_included_config: ClusterConfig, ) -> bool
Updates this node’s log (Log) to reflect the installation of a snapshot.
If the node log contains last_included_position, log entries up to last_included_position are removed.
If last_included_position is greater than the last log position, the log is replaced with an empty log starting at last_included_position.
Note that how to install a snapshot is outside of the scope of this crate.
§Preconditions
This method returns false and ignores the installation if the following conditions are not met:
last_included_positionis valid, which means:last_included_configis the configuration atlast_included_position.index.