use crate::config::RaftConfig;
use crate::error::{Error, Result};
use crate::log::{MemoryLog, RaftLog};
use crate::message::{AppendEntries, AppendEntriesReply, Message, RequestVote, RequestVoteReply};
use crate::rng::Rng;
use crate::types::{HardState, Index, LogEntry, NodeId, Role, Term};
pub enum Event {
Tick,
Message(Message),
Propose(Vec<u8>),
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Action {
Send {
to: NodeId,
message: Message,
},
Apply {
index: Index,
term: Term,
command: Vec<u8>,
},
}
pub struct RaftNode<L: RaftLog = MemoryLog> {
id: NodeId,
peers: Vec<NodeId>,
quorum: usize,
election_timeout_min: u32,
election_timeout_max: u32,
heartbeat_interval: u32,
log: L,
role: Role,
current_term: Term,
voted_for: Option<NodeId>,
leader_id: Option<NodeId>,
commit_index: Index,
last_applied: Index,
election_elapsed: u32,
heartbeat_elapsed: u32,
election_timeout: u32,
votes: Vec<NodeId>,
rng: Rng,
}
impl RaftNode<MemoryLog> {
#[must_use]
pub fn new(config: RaftConfig) -> Self {
Self::with_log(config, MemoryLog::new())
}
}
impl<L: RaftLog> RaftNode<L> {
#[must_use]
pub fn with_log(config: RaftConfig, log: L) -> Self {
let hard = log.hard_state();
let cluster_size = config.peers.len() + 1;
let quorum = cluster_size / 2 + 1;
let mut rng = Rng::new(config.seed);
let election_timeout =
rng.gen_range(config.election_timeout_min, config.election_timeout_max);
Self {
id: config.id,
peers: config.peers,
quorum,
election_timeout_min: config.election_timeout_min,
election_timeout_max: config.election_timeout_max,
heartbeat_interval: config.heartbeat_interval,
log,
role: Role::Follower,
current_term: hard.term,
voted_for: hard.voted_for,
leader_id: None,
commit_index: 0,
last_applied: 0,
election_elapsed: 0,
heartbeat_elapsed: 0,
election_timeout,
votes: Vec::new(),
rng,
}
}
#[inline]
#[must_use]
pub fn id(&self) -> NodeId {
self.id
}
#[inline]
#[must_use]
pub fn role(&self) -> Role {
self.role
}
#[inline]
#[must_use]
pub fn is_leader(&self) -> bool {
self.role == Role::Leader
}
#[inline]
#[must_use]
pub fn term(&self) -> Term {
self.current_term
}
#[inline]
#[must_use]
pub fn leader(&self) -> Option<NodeId> {
self.leader_id
}
#[inline]
#[must_use]
pub fn commit_index(&self) -> Index {
self.commit_index
}
#[inline]
#[must_use]
pub fn last_applied(&self) -> Index {
self.last_applied
}
#[inline]
#[must_use]
pub fn log(&self) -> &L {
&self.log
}
pub fn step(&mut self, event: Event) -> Result<Vec<Action>> {
match event {
Event::Tick => self.tick(),
Event::Message(message) => self.handle_message(message),
Event::Propose(command) => self.propose(command),
}
}
fn tick(&mut self) -> Result<Vec<Action>> {
let mut actions = Vec::new();
match self.role {
Role::Follower | Role::Candidate => {
self.election_elapsed += 1;
if self.election_elapsed >= self.election_timeout {
self.start_election(&mut actions)?;
}
}
Role::Leader => {
self.heartbeat_elapsed += 1;
if self.heartbeat_elapsed >= self.heartbeat_interval {
self.heartbeat_elapsed = 0;
self.broadcast_heartbeat(&mut actions);
}
}
}
Ok(actions)
}
fn start_election(&mut self, actions: &mut Vec<Action>) -> Result<()> {
self.role = Role::Candidate;
self.current_term += 1;
self.voted_for = Some(self.id);
self.leader_id = None;
self.votes.clear();
self.votes.push(self.id);
self.reset_election_timer();
self.persist_hard_state()?;
if self.votes.len() >= self.quorum {
self.become_leader(actions);
return Ok(());
}
let last_log_index = self.log.last_index();
let last_log_term = self.log.last_term();
for &peer in &self.peers {
actions.push(Action::Send {
to: peer,
message: Message::RequestVote(RequestVote {
term: self.current_term,
candidate: self.id,
last_log_index,
last_log_term,
}),
});
}
Ok(())
}
fn become_leader(&mut self, actions: &mut Vec<Action>) {
self.role = Role::Leader;
self.leader_id = Some(self.id);
self.heartbeat_elapsed = 0;
self.broadcast_heartbeat(actions);
self.advance_commit_as_leader(actions);
}
fn broadcast_heartbeat(&self, actions: &mut Vec<Action>) {
let prev_log_index = self.log.last_index();
let prev_log_term = self.log.last_term();
for &peer in &self.peers {
actions.push(Action::Send {
to: peer,
message: Message::AppendEntries(AppendEntries {
term: self.current_term,
leader: self.id,
prev_log_index,
prev_log_term,
entries: Vec::new(),
leader_commit: self.commit_index,
}),
});
}
}
fn propose(&mut self, command: Vec<u8>) -> Result<Vec<Action>> {
if self.role != Role::Leader {
return Err(Error::NotLeader {
leader: self.leader_id,
});
}
let index = self.log.last_index() + 1;
let entry = LogEntry::new(self.current_term, index, command);
self.log.append(core::slice::from_ref(&entry))?;
self.log.sync()?;
let mut actions = Vec::new();
self.advance_commit_as_leader(&mut actions);
Ok(actions)
}
fn advance_commit_as_leader(&mut self, actions: &mut Vec<Action>) {
let last = self.log.last_index();
let replicas_with_last = 1;
if replicas_with_last >= self.quorum
&& last > self.commit_index
&& self.log.term_at(last) == Some(self.current_term)
{
self.commit_index = last;
self.drain_applies(actions);
}
}
fn drain_applies(&mut self, actions: &mut Vec<Action>) {
while self.last_applied < self.commit_index {
self.last_applied += 1;
if let Some(entry) = self.log.entry(self.last_applied) {
actions.push(Action::Apply {
index: entry.index,
term: entry.term,
command: entry.command,
});
}
}
}
fn handle_message(&mut self, message: Message) -> Result<Vec<Action>> {
if message.term() > self.current_term {
self.become_follower(message.term(), None)?;
}
let mut actions = Vec::new();
match message {
Message::RequestVote(rv) => self.handle_request_vote(rv, &mut actions)?,
Message::RequestVoteReply(reply) => self.handle_vote_reply(reply, &mut actions),
Message::AppendEntries(ae) => self.handle_append_entries(ae, &mut actions)?,
Message::AppendEntriesReply(reply) => self.handle_append_reply(reply),
}
Ok(actions)
}
fn become_follower(&mut self, term: Term, leader: Option<NodeId>) -> Result<()> {
let hard_state_changed = term > self.current_term;
self.role = Role::Follower;
if term > self.current_term {
self.current_term = term;
self.voted_for = None;
}
self.leader_id = leader;
self.votes.clear();
if hard_state_changed {
self.persist_hard_state()?;
}
Ok(())
}
fn handle_request_vote(&mut self, rv: RequestVote, actions: &mut Vec<Action>) -> Result<()> {
let mut granted = false;
if rv.term >= self.current_term {
let can_vote = match self.voted_for {
None => true,
Some(c) => c == rv.candidate,
};
let log_ok = self.candidate_log_up_to_date(rv.last_log_term, rv.last_log_index);
if can_vote && log_ok {
granted = true;
self.voted_for = Some(rv.candidate);
self.persist_hard_state()?;
self.reset_election_timer();
}
}
actions.push(Action::Send {
to: rv.candidate,
message: Message::RequestVoteReply(RequestVoteReply {
term: self.current_term,
vote_granted: granted,
from: self.id,
}),
});
Ok(())
}
fn candidate_log_up_to_date(&self, cand_last_term: Term, cand_last_index: Index) -> bool {
let my_term = self.log.last_term();
let my_index = self.log.last_index();
cand_last_term > my_term || (cand_last_term == my_term && cand_last_index >= my_index)
}
fn handle_vote_reply(&mut self, reply: RequestVoteReply, actions: &mut Vec<Action>) {
if self.role != Role::Candidate || reply.term != self.current_term {
return;
}
if reply.vote_granted && !self.votes.contains(&reply.from) {
self.votes.push(reply.from);
if self.votes.len() >= self.quorum {
self.become_leader(actions);
}
}
}
fn handle_append_entries(
&mut self,
ae: AppendEntries,
actions: &mut Vec<Action>,
) -> Result<()> {
let mut success = false;
let mut match_index = 0;
if ae.term >= self.current_term {
self.role = Role::Follower;
self.leader_id = Some(ae.leader);
self.reset_election_timer();
let prev_ok = ae.prev_log_index == 0
|| self.log.term_at(ae.prev_log_index) == Some(ae.prev_log_term);
if prev_ok {
success = true;
match_index = ae.prev_log_index;
if ae.leader_commit > self.commit_index {
self.commit_index = ae.leader_commit.min(self.log.last_index());
self.drain_applies(actions);
}
}
}
actions.push(Action::Send {
to: ae.leader,
message: Message::AppendEntriesReply(AppendEntriesReply {
term: self.current_term,
success,
from: self.id,
match_index,
}),
});
Ok(())
}
fn handle_append_reply(&mut self, _reply: AppendEntriesReply) {}
fn persist_hard_state(&mut self) -> Result<()> {
self.log.set_hard_state(HardState {
term: self.current_term,
voted_for: self.voted_for,
})?;
self.log.sync()
}
fn reset_election_timer(&mut self) {
self.election_elapsed = 0;
self.election_timeout = self
.rng
.gen_range(self.election_timeout_min, self.election_timeout_max);
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::*;
fn drive_to_leader(node: &mut RaftNode) {
for _ in 0..1_000 {
if node.is_leader() {
return;
}
let _ = node.step(Event::Tick).expect("tick");
}
panic!("node never became leader");
}
#[test]
fn test_single_node_elects_itself() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
assert_eq!(node.role(), Role::Leader);
assert_eq!(node.leader(), Some(1));
assert_eq!(node.term(), 1);
}
#[test]
fn test_single_node_commits_proposal() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let actions = node.step(Event::Propose(b"a".to_vec())).unwrap();
assert_eq!(node.commit_index(), 1);
assert_eq!(node.last_applied(), 1);
let applied: Vec<_> = actions
.iter()
.filter(|a| matches!(a, Action::Apply { .. }))
.collect();
assert_eq!(applied.len(), 1);
}
#[test]
fn test_propose_to_follower_is_rejected() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let err = node.step(Event::Propose(b"a".to_vec())).unwrap_err();
assert!(matches!(err, Error::NotLeader { .. }));
}
#[test]
fn test_candidate_requests_votes_from_peers() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let mut sends = Vec::new();
for _ in 0..1_000 {
let actions = node.step(Event::Tick).unwrap();
if !actions.is_empty() {
sends = actions;
break;
}
}
assert_eq!(node.role(), Role::Candidate);
let targets: Vec<NodeId> = sends
.iter()
.filter_map(|a| match a {
Action::Send {
to,
message: Message::RequestVote(_),
} => Some(*to),
_ => None,
})
.collect();
assert_eq!(targets.len(), 2);
assert!(targets.contains(&2) && targets.contains(&3));
}
#[test]
fn test_node_grants_one_vote_then_refuses_another_candidate() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let grant = |node: &mut RaftNode, candidate: NodeId| -> bool {
let actions = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 5,
candidate,
last_log_index: 0,
last_log_term: 0,
})))
.unwrap();
actions.iter().any(|a| {
matches!(
a,
Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
)
})
};
assert!(grant(&mut node, 2));
assert!(!grant(&mut node, 3)); }
#[test]
fn test_higher_term_message_steps_node_down() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let leader_term = node.term();
let _ = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: leader_term + 5,
leader: 9,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.role(), Role::Follower);
assert_eq!(node.term(), leader_term + 5);
assert_eq!(node.leader(), Some(9));
}
#[test]
fn test_stale_term_request_vote_is_refused() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node); let term = node.term();
let actions = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: term - 1,
candidate: 2,
last_log_index: 99,
last_log_term: 99,
})))
.unwrap();
let granted = actions.iter().any(|a| {
matches!(
a,
Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
)
});
assert!(!granted);
}
#[test]
fn test_heartbeat_resets_follower_election_timer() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5));
let _ = node.step(Event::Tick).unwrap();
let _ = node.step(Event::Tick).unwrap();
let _ = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 1,
leader: 2,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.role(), Role::Follower);
assert_eq!(node.leader(), Some(2));
let _ = node.step(Event::Tick).unwrap();
assert_eq!(node.role(), Role::Follower);
}
fn elect_multi_node_leader() -> RaftNode {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_heartbeat_interval(2));
for _ in 0..1_000 {
let actions = node.step(Event::Tick).expect("tick");
if !actions.is_empty() {
break; }
}
assert_eq!(node.role(), Role::Candidate);
let term = node.term();
let _ = node
.step(Event::Message(Message::RequestVoteReply(
RequestVoteReply {
term,
vote_granted: true,
from: 2,
},
)))
.expect("vote reply");
assert_eq!(node.role(), Role::Leader);
node
}
#[test]
fn test_vote_replies_elect_a_multi_node_leader() {
let node = elect_multi_node_leader();
assert_eq!(node.leader(), Some(1));
}
#[test]
fn test_leader_emits_heartbeats_on_interval() {
let mut node = elect_multi_node_leader();
let first = node.step(Event::Tick).unwrap();
assert!(first.is_empty());
let second = node.step(Event::Tick).unwrap();
let heartbeats = second
.iter()
.filter(|a| {
matches!(
a,
Action::Send {
message: Message::AppendEntries(_),
..
}
)
})
.count();
assert_eq!(heartbeats, 2);
}
#[test]
fn test_persisted_hard_state_is_restored() {
let mut log = MemoryLog::new();
log.set_hard_state(HardState {
term: 7,
voted_for: Some(3),
})
.unwrap();
let node = RaftNode::with_log(RaftConfig::single(1), log);
assert_eq!(node.term(), 7);
}
#[test]
fn test_vote_is_persisted_to_log() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let _ = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 4,
candidate: 2,
last_log_index: 0,
last_log_term: 0,
})))
.unwrap();
assert_eq!(
node.log().hard_state(),
HardState {
term: 4,
voted_for: Some(2)
}
);
}
}