use crate::config::RaftConfig;
use crate::error::{Error, Result};
use crate::log::{MemoryLog, RaftLog};
use crate::message::{
AppendEntries, AppendEntriesReply, InstallSnapshot, InstallSnapshotReply, Message, RequestVote,
RequestVoteReply, TimeoutNow,
};
use crate::rng::Rng;
use crate::types::{HardState, Index, LogEntry, NodeId, Role, Snapshot, Term};
fn sorted_members(ids: impl IntoIterator<Item = NodeId>) -> Vec<NodeId> {
let mut v: Vec<NodeId> = ids.into_iter().collect();
v.sort_unstable();
v.dedup();
v
}
pub enum Event {
Tick,
Message(Message),
Propose(Vec<u8>),
Snapshot {
index: Index,
data: Vec<u8>,
},
AddServer(NodeId),
RemoveServer(NodeId),
TransferLeadership(NodeId),
}
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Action {
Send {
to: NodeId,
message: Message,
},
Apply {
index: Index,
term: Term,
command: Vec<u8>,
},
Snapshot {
index: Index,
term: Term,
},
RestoreSnapshot {
index: Index,
term: Term,
data: Vec<u8>,
},
MembershipChanged {
members: Vec<NodeId>,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ProgressState {
Probe,
Replicate,
}
#[derive(Clone, Copy, Debug)]
struct Progress {
id: NodeId,
next_index: Index,
match_index: Index,
state: ProgressState,
}
pub struct RaftNode<L: RaftLog = MemoryLog> {
id: NodeId,
voters: Vec<NodeId>,
base_config: Vec<NodeId>,
config_index: Index,
election_timeout_min: u32,
election_timeout_max: u32,
heartbeat_interval: u32,
max_batch: usize,
snapshot_threshold: usize,
log: L,
role: Role,
current_term: Term,
voted_for: Option<NodeId>,
leader_id: Option<NodeId>,
commit_index: Index,
last_applied: Index,
election_elapsed: u32,
heartbeat_elapsed: u32,
election_timeout: u32,
votes: Vec<NodeId>,
progress: Vec<Progress>,
snapshot_hinted_at: Index,
transfer_target: Option<NodeId>,
rng: Rng,
}
impl RaftNode<MemoryLog> {
#[must_use]
pub fn new(config: RaftConfig) -> Self {
Self::with_log(config, MemoryLog::new())
}
}
impl<L: RaftLog> RaftNode<L> {
#[must_use]
pub fn with_log(config: RaftConfig, log: L) -> Self {
let hard = log.hard_state();
let base = log.snapshot_index();
let bootstrap = sorted_members(config.peers.iter().copied().chain([config.id]));
let base_config = match log.snapshot() {
Some(s) if !s.config.is_empty() => s.config,
_ => bootstrap,
};
let mut voters = base_config.clone();
let mut config_index = 0;
let mut i = log.last_index();
while i > base {
if let Some(members) = log.entry(i).and_then(|e| e.members()) {
voters = members;
config_index = i;
break;
}
i -= 1;
}
let mut rng = Rng::new(config.seed);
let election_timeout =
rng.gen_range(config.election_timeout_min, config.election_timeout_max);
Self {
id: config.id,
voters,
base_config,
config_index,
election_timeout_min: config.election_timeout_min,
election_timeout_max: config.election_timeout_max,
heartbeat_interval: config.heartbeat_interval,
max_batch: config.max_batch,
snapshot_threshold: config.snapshot_threshold,
log,
role: Role::Follower,
current_term: hard.term,
voted_for: hard.voted_for,
leader_id: None,
commit_index: base,
last_applied: base,
election_elapsed: 0,
heartbeat_elapsed: 0,
election_timeout,
votes: Vec::new(),
progress: Vec::new(),
snapshot_hinted_at: base,
transfer_target: None,
rng,
}
}
#[inline]
#[must_use]
pub fn id(&self) -> NodeId {
self.id
}
#[inline]
#[must_use]
pub fn role(&self) -> Role {
self.role
}
#[inline]
#[must_use]
pub fn is_leader(&self) -> bool {
self.role == Role::Leader
}
#[inline]
#[must_use]
pub fn term(&self) -> Term {
self.current_term
}
#[inline]
#[must_use]
pub fn leader(&self) -> Option<NodeId> {
self.leader_id
}
#[inline]
#[must_use]
pub fn commit_index(&self) -> Index {
self.commit_index
}
#[inline]
#[must_use]
pub fn last_applied(&self) -> Index {
self.last_applied
}
#[inline]
#[must_use]
pub fn log(&self) -> &L {
&self.log
}
#[inline]
#[must_use]
pub fn members(&self) -> &[NodeId] {
&self.voters
}
#[inline]
fn quorum(&self) -> usize {
self.voters.len() / 2 + 1
}
#[inline]
fn is_voter(&self) -> bool {
self.voters.contains(&self.id)
}
fn set_config(&mut self, voters: Vec<NodeId>, config_index: Index, actions: &mut Vec<Action>) {
let changed = voters != self.voters;
self.voters = voters;
self.config_index = config_index;
if self.role == Role::Leader {
self.rebuild_progress();
}
if changed {
actions.push(Action::MembershipChanged {
members: self.voters.clone(),
});
}
}
fn refresh_config(&mut self, actions: &mut Vec<Action>) {
let base = self.log.snapshot_index();
let mut voters = self.base_config.clone();
let mut config_index = 0;
let mut i = self.log.last_index();
while i > base {
if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
voters = members;
config_index = i;
break;
}
i -= 1;
}
self.set_config(voters, config_index, actions);
}
fn rebuild_progress(&mut self) {
let next = self.log.last_index() + 1;
let old = core::mem::take(&mut self.progress);
self.progress = self
.voters
.iter()
.filter(|&&id| id != self.id)
.map(|&id| {
old.iter()
.find(|p| p.id == id)
.copied()
.unwrap_or(Progress {
id,
next_index: next,
match_index: 0,
state: ProgressState::Probe,
})
})
.collect();
}
fn progress_index(&self, id: NodeId) -> Option<usize> {
self.progress.iter().position(|p| p.id == id)
}
pub fn step(&mut self, event: Event) -> Result<Vec<Action>> {
match event {
Event::Tick => self.tick(),
Event::Message(message) => self.handle_message(message),
Event::Propose(command) => self.propose(command),
Event::Snapshot { index, data } => self.handle_snapshot_event(index, data),
Event::AddServer(id) => self.change_membership(Some(id), None),
Event::RemoveServer(id) => self.change_membership(None, Some(id)),
Event::TransferLeadership(target) => self.transfer_leadership(target),
}
}
fn tick(&mut self) -> Result<Vec<Action>> {
let mut actions = Vec::new();
match self.role {
Role::Follower | Role::Candidate => {
self.election_elapsed += 1;
if self.election_elapsed >= self.election_timeout && self.is_voter() {
self.start_election(false, &mut actions)?;
}
}
Role::Leader => {
self.heartbeat_elapsed += 1;
if self.heartbeat_elapsed >= self.heartbeat_interval {
self.heartbeat_elapsed = 0;
self.replicate_to_all(&mut actions);
}
}
}
Ok(actions)
}
fn start_election(&mut self, force: bool, actions: &mut Vec<Action>) -> Result<()> {
self.role = Role::Candidate;
self.current_term += 1;
self.voted_for = Some(self.id);
self.leader_id = None;
self.transfer_target = None;
self.progress.clear();
self.votes.clear();
self.votes.push(self.id);
self.reset_election_timer();
self.persist_hard_state()?;
if self.votes.len() >= self.quorum() {
self.become_leader(actions);
return Ok(());
}
let last_log_index = self.log.last_index();
let last_log_term = self.log.last_term();
let term = self.current_term;
let id = self.id;
for &peer in &self.voters {
if peer == id {
continue;
}
actions.push(Action::Send {
to: peer,
message: Message::RequestVote(RequestVote {
term,
candidate: id,
last_log_index,
last_log_term,
force,
}),
});
}
Ok(())
}
fn become_leader(&mut self, actions: &mut Vec<Action>) {
self.role = Role::Leader;
self.leader_id = Some(self.id);
self.heartbeat_elapsed = 0;
self.transfer_target = None;
self.rebuild_progress();
self.replicate_to_all(actions);
self.advance_commit(actions);
}
fn replicate_to_all(&mut self, actions: &mut Vec<Action>) {
for i in 0..self.progress.len() {
self.send_append(i, actions);
}
}
fn replicate_to_streaming(&mut self, actions: &mut Vec<Action>) {
for i in 0..self.progress.len() {
if self.progress[i].state == ProgressState::Replicate {
self.send_append(i, actions);
}
}
}
fn send_append(&mut self, i: usize, actions: &mut Vec<Action>) {
let next = self.progress[i].next_index;
if next <= self.log.snapshot_index() {
self.send_snapshot(i, actions);
return;
}
let peer = self.progress[i].id;
let state = self.progress[i].state;
let prev_log_index = next - 1;
let prev_log_term = self.log.term_at(prev_log_index).unwrap_or(0);
let last = self.log.last_index();
let entries = if last >= next {
let to = last.min(next + self.max_batch as Index - 1);
self.log.entries(next, to)
} else {
Vec::new()
};
let count = entries.len() as Index;
actions.push(Action::Send {
to: peer,
message: Message::AppendEntries(AppendEntries {
term: self.current_term,
leader: self.id,
prev_log_index,
prev_log_term,
entries,
leader_commit: self.commit_index,
}),
});
if count > 0 && state == ProgressState::Replicate {
self.progress[i].next_index = next + count;
}
}
fn send_snapshot(&mut self, i: usize, actions: &mut Vec<Action>) {
if let Some(snapshot) = self.log.snapshot() {
self.progress[i].state = ProgressState::Probe;
actions.push(Action::Send {
to: self.progress[i].id,
message: Message::InstallSnapshot(InstallSnapshot {
term: self.current_term,
leader: self.id,
snapshot,
}),
});
}
}
fn propose(&mut self, command: Vec<u8>) -> Result<Vec<Action>> {
if self.role != Role::Leader || self.transfer_target.is_some() {
return Err(Error::NotLeader {
leader: self.leader_id,
});
}
let index = self.log.last_index() + 1;
let entry = LogEntry::new(self.current_term, index, command);
self.log.append(core::slice::from_ref(&entry))?;
self.log.sync()?;
let mut actions = Vec::new();
self.replicate_to_streaming(&mut actions);
self.advance_commit(&mut actions);
Ok(actions)
}
fn advance_commit(&mut self, actions: &mut Vec<Action>) {
let last = self.log.last_index();
let quorum = self.quorum();
let leader_holds = usize::from(self.is_voter());
let mut new_commit = self.commit_index;
let mut n = last;
while n > self.commit_index {
match self.log.term_at(n) {
Some(term) if term == self.current_term => {
let mut replicas = leader_holds;
for p in &self.progress {
if p.match_index >= n {
replicas += 1;
}
}
if replicas >= quorum {
new_commit = n;
break; }
}
Some(term) if term < self.current_term => break,
_ => {}
}
n -= 1;
}
if new_commit > self.commit_index {
self.commit_index = new_commit;
self.drain_applies(actions);
if self.role == Role::Leader
&& !self.is_voter()
&& self.config_index != 0
&& self.commit_index >= self.config_index
{
self.step_down_to_follower();
}
}
}
fn step_down_to_follower(&mut self) {
self.role = Role::Follower;
self.leader_id = None;
self.transfer_target = None;
self.progress.clear();
self.votes.clear();
}
fn drain_applies(&mut self, actions: &mut Vec<Action>) {
while self.last_applied < self.commit_index {
self.last_applied += 1;
if let Some(entry) = self.log.entry(self.last_applied) {
if entry.members().is_none() {
actions.push(Action::Apply {
index: entry.index,
term: entry.term,
command: entry.command,
});
}
}
}
self.maybe_hint_snapshot(actions);
}
fn maybe_hint_snapshot(&mut self, actions: &mut Vec<Action>) {
if self.snapshot_threshold == 0 {
return;
}
let base = self.log.snapshot_index();
let grown = self.last_applied.saturating_sub(base) as usize;
if grown >= self.snapshot_threshold && self.last_applied > self.snapshot_hinted_at {
if let Some(term) = self.log.term_at(self.last_applied) {
self.snapshot_hinted_at = self.last_applied;
actions.push(Action::Snapshot {
index: self.last_applied,
term,
});
}
}
}
fn handle_snapshot_event(&mut self, index: Index, data: Vec<u8>) -> Result<Vec<Action>> {
if index > self.commit_index
|| index > self.last_applied
|| index <= self.log.snapshot_index()
{
return Ok(Vec::new());
}
let Some(term) = self.log.term_at(index) else {
return Ok(Vec::new());
};
let config = self.config_at(index);
self.base_config = config.clone();
self.log
.apply_snapshot(&Snapshot::with_config(index, term, config, data))?;
self.log.sync()?;
if self.snapshot_hinted_at < index {
self.snapshot_hinted_at = index;
}
let mut actions = Vec::new();
self.refresh_config(&mut actions);
Ok(actions)
}
fn config_at(&self, index: Index) -> Vec<NodeId> {
let base = self.log.snapshot_index();
let mut i = index.min(self.log.last_index());
while i > base {
if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
return members;
}
i -= 1;
}
self.base_config.clone()
}
fn handle_message(&mut self, message: Message) -> Result<Vec<Action>> {
if matches!(message, Message::RequestVote(ref rv) if !rv.force)
&& self.leader_id.is_some()
&& self.election_elapsed < self.election_timeout_min
{
return Ok(Vec::new());
}
if message.term() > self.current_term {
self.become_follower(message.term(), None)?;
}
let mut actions = Vec::new();
match message {
Message::RequestVote(rv) => self.handle_request_vote(rv, &mut actions)?,
Message::RequestVoteReply(reply) => self.handle_vote_reply(reply, &mut actions),
Message::AppendEntries(ae) => self.handle_append_entries(ae, &mut actions)?,
Message::AppendEntriesReply(reply) => self.handle_append_reply(reply, &mut actions),
Message::InstallSnapshot(rpc) => self.handle_install_snapshot(rpc, &mut actions)?,
Message::InstallSnapshotReply(reply) => {
self.handle_install_snapshot_reply(reply, &mut actions);
}
Message::TimeoutNow(rpc) => self.handle_timeout_now(rpc, &mut actions)?,
}
Ok(actions)
}
fn become_follower(&mut self, term: Term, leader: Option<NodeId>) -> Result<()> {
let hard_state_changed = term > self.current_term;
self.role = Role::Follower;
if term > self.current_term {
self.current_term = term;
self.voted_for = None;
}
self.leader_id = leader;
self.transfer_target = None;
self.votes.clear();
self.progress.clear();
if hard_state_changed {
self.persist_hard_state()?;
}
Ok(())
}
fn handle_request_vote(&mut self, rv: RequestVote, actions: &mut Vec<Action>) -> Result<()> {
let mut granted = false;
if rv.term >= self.current_term {
let can_vote = match self.voted_for {
None => true,
Some(c) => c == rv.candidate,
};
let log_ok = self.candidate_log_up_to_date(rv.last_log_term, rv.last_log_index);
if can_vote && log_ok {
granted = true;
self.voted_for = Some(rv.candidate);
self.persist_hard_state()?;
self.reset_election_timer();
}
}
actions.push(Action::Send {
to: rv.candidate,
message: Message::RequestVoteReply(RequestVoteReply {
term: self.current_term,
vote_granted: granted,
from: self.id,
}),
});
Ok(())
}
fn candidate_log_up_to_date(&self, cand_last_term: Term, cand_last_index: Index) -> bool {
let my_term = self.log.last_term();
let my_index = self.log.last_index();
cand_last_term > my_term || (cand_last_term == my_term && cand_last_index >= my_index)
}
fn handle_vote_reply(&mut self, reply: RequestVoteReply, actions: &mut Vec<Action>) {
if self.role != Role::Candidate || reply.term != self.current_term {
return;
}
if reply.vote_granted && !self.votes.contains(&reply.from) {
self.votes.push(reply.from);
if self.votes.len() >= self.quorum() {
self.become_leader(actions);
}
}
}
fn handle_append_entries(
&mut self,
ae: AppendEntries,
actions: &mut Vec<Action>,
) -> Result<()> {
let mut reply = AppendEntriesReply {
term: self.current_term,
success: false,
from: self.id,
match_index: 0,
conflict_index: 0,
conflict_term: 0,
};
if ae.term < self.current_term {
actions.push(Action::Send {
to: ae.leader,
message: Message::AppendEntriesReply(reply),
});
return Ok(());
}
self.role = Role::Follower;
self.leader_id = Some(ae.leader);
self.reset_election_timer();
let base = self.log.snapshot_index();
if ae.prev_log_index < base {
if ae.leader_commit > self.commit_index {
self.commit_index = ae.leader_commit.min(base);
self.drain_applies(actions);
}
reply.success = true;
reply.match_index = base;
actions.push(Action::Send {
to: ae.leader,
message: Message::AppendEntriesReply(reply),
});
return Ok(());
}
let prev_ok = self.log.term_at(ae.prev_log_index) == Some(ae.prev_log_term);
if !prev_ok {
let last = self.log.last_index();
if ae.prev_log_index > last {
reply.conflict_index = last + 1;
reply.conflict_term = 0;
} else {
let conflict_term = self.log.term_at(ae.prev_log_index).unwrap_or(0);
reply.conflict_term = conflict_term;
reply.conflict_index = self.first_index_of_term(conflict_term, ae.prev_log_index);
}
actions.push(Action::Send {
to: ae.leader,
message: Message::AppendEntriesReply(reply),
});
return Ok(());
}
let (match_index, truncated) = if ae.entries.is_empty() {
(ae.prev_log_index, false)
} else {
self.append_from_leader(&ae.entries)?
};
if truncated || ae.entries.iter().any(|e| e.members().is_some()) {
self.refresh_config(actions);
}
if ae.leader_commit > self.commit_index {
self.commit_index = ae.leader_commit.min(match_index);
self.drain_applies(actions);
}
reply.success = true;
reply.match_index = match_index;
actions.push(Action::Send {
to: ae.leader,
message: Message::AppendEntriesReply(reply),
});
Ok(())
}
fn append_from_leader(&mut self, entries: &[LogEntry]) -> Result<(Index, bool)> {
let mut i = 0;
let mut truncated = false;
while i < entries.len() {
let entry = &entries[i];
match self.log.term_at(entry.index) {
Some(term) if term == entry.term => i += 1,
Some(_) => {
self.log.truncate(entry.index)?;
truncated = true;
break;
}
None => break, }
}
if i < entries.len() {
self.log.append(&entries[i..])?;
self.log.sync()?;
}
Ok((entries[entries.len() - 1].index, truncated))
}
fn handle_append_reply(&mut self, reply: AppendEntriesReply, actions: &mut Vec<Action>) {
if self.role != Role::Leader || reply.term != self.current_term {
return; }
let Some(i) = self.progress_index(reply.from) else {
return;
};
if reply.success {
if reply.match_index > self.progress[i].match_index {
self.progress[i].match_index = reply.match_index;
}
let want_next = self.progress[i].match_index + 1;
if want_next > self.progress[i].next_index {
self.progress[i].next_index = want_next;
}
self.progress[i].state = ProgressState::Replicate;
self.advance_commit(actions);
self.maybe_send_timeout_now(reply.from, actions);
if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
self.send_append(i, actions);
}
} else {
let next = self.progress[i].next_index;
let matched = self.progress[i].match_index;
self.progress[i].next_index =
self.rejected_next(next, matched, reply.conflict_index, reply.conflict_term);
self.progress[i].state = ProgressState::Probe;
self.send_append(i, actions);
}
}
fn handle_install_snapshot(
&mut self,
rpc: InstallSnapshot,
actions: &mut Vec<Action>,
) -> Result<()> {
if rpc.term < self.current_term {
actions.push(Action::Send {
to: rpc.leader,
message: Message::InstallSnapshotReply(InstallSnapshotReply {
term: self.current_term,
from: self.id,
last_index: 0,
}),
});
return Ok(());
}
self.role = Role::Follower;
self.leader_id = Some(rpc.leader);
self.reset_election_timer();
let snap_index = rpc.snapshot.index;
let snap_term = rpc.snapshot.term;
if snap_index > self.log.snapshot_index() && snap_index > self.commit_index {
if !rpc.snapshot.config.is_empty() {
self.base_config = rpc.snapshot.config.clone();
}
self.log.apply_snapshot(&rpc.snapshot)?;
self.log.sync()?;
self.commit_index = snap_index;
self.last_applied = snap_index;
if snap_index > self.snapshot_hinted_at {
self.snapshot_hinted_at = snap_index;
}
self.refresh_config(actions);
actions.push(Action::RestoreSnapshot {
index: snap_index,
term: snap_term,
data: rpc.snapshot.data,
});
}
let last_index = self
.log
.snapshot_index()
.max(snap_index.min(self.commit_index));
actions.push(Action::Send {
to: rpc.leader,
message: Message::InstallSnapshotReply(InstallSnapshotReply {
term: self.current_term,
from: self.id,
last_index,
}),
});
Ok(())
}
fn handle_install_snapshot_reply(
&mut self,
reply: InstallSnapshotReply,
actions: &mut Vec<Action>,
) {
if self.role != Role::Leader || reply.term != self.current_term {
return;
}
let Some(i) = self.progress_index(reply.from) else {
return;
};
if reply.last_index > self.progress[i].match_index {
self.progress[i].match_index = reply.last_index;
}
self.progress[i].next_index = self.progress[i].match_index + 1;
self.progress[i].state = ProgressState::Replicate;
self.advance_commit(actions);
self.maybe_send_timeout_now(reply.from, actions);
if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
self.send_append(i, actions);
}
}
fn rejected_next(
&self,
current_next: Index,
match_index: Index,
conflict_index: Index,
conflict_term: Term,
) -> Index {
let floor = match_index + 1;
let mut target = conflict_index.max(1);
if conflict_term > 0 {
if let Some(last) = self.last_index_of_term(conflict_term) {
target = last + 1;
}
}
let ceil = current_next.saturating_sub(1).max(floor);
target.clamp(floor, ceil)
}
fn first_index_of_term(&self, term: Term, upto: Index) -> Index {
let mut i = upto;
while i > 1 && self.log.term_at(i - 1) == Some(term) {
i -= 1;
}
i
}
fn last_index_of_term(&self, term: Term) -> Option<Index> {
let mut i = self.log.last_index();
while i >= 1 {
match self.log.term_at(i) {
Some(t) if t == term => return Some(i),
Some(t) if t < term => return None,
_ => {}
}
i -= 1;
}
None
}
fn change_membership(
&mut self,
add: Option<NodeId>,
remove: Option<NodeId>,
) -> Result<Vec<Action>> {
if self.role != Role::Leader || self.transfer_target.is_some() {
return Err(Error::NotLeader {
leader: self.leader_id,
});
}
if self.config_index > self.commit_index {
return Err(Error::ConfigInProgress);
}
let mut members = self.voters.clone();
if let Some(id) = add {
if !members.contains(&id) {
members.push(id);
}
}
if let Some(id) = remove {
members.retain(|&m| m != id);
}
let members = sorted_members(members);
if members == self.voters {
return Ok(Vec::new()); }
let index = self.log.last_index() + 1;
let entry = LogEntry::config(self.current_term, index, &members);
self.log.append(core::slice::from_ref(&entry))?;
self.log.sync()?;
let mut actions = Vec::new();
self.set_config(members, index, &mut actions);
self.replicate_to_all(&mut actions);
self.advance_commit(&mut actions);
Ok(actions)
}
fn transfer_leadership(&mut self, target: NodeId) -> Result<Vec<Action>> {
if self.role != Role::Leader || target == self.id || !self.voters.contains(&target) {
return Ok(Vec::new());
}
self.transfer_target = Some(target);
let mut actions = Vec::new();
self.maybe_send_timeout_now(target, &mut actions);
if self.transfer_target.is_some() {
if let Some(i) = self.progress_index(target) {
self.send_append(i, &mut actions);
}
}
Ok(actions)
}
fn maybe_send_timeout_now(&mut self, target: NodeId, actions: &mut Vec<Action>) {
if self.transfer_target != Some(target) {
return;
}
let caught_up = self
.progress_index(target)
.is_some_and(|i| self.progress[i].match_index >= self.log.last_index());
if caught_up {
self.transfer_target = None;
actions.push(Action::Send {
to: target,
message: Message::TimeoutNow(TimeoutNow {
term: self.current_term,
leader: self.id,
}),
});
}
}
fn handle_timeout_now(&mut self, rpc: TimeoutNow, actions: &mut Vec<Action>) -> Result<()> {
if rpc.term < self.current_term || !self.is_voter() {
return Ok(());
}
self.start_election(true, actions)
}
fn persist_hard_state(&mut self) -> Result<()> {
self.log.set_hard_state(HardState {
term: self.current_term,
voted_for: self.voted_for,
})?;
self.log.sync()
}
fn reset_election_timer(&mut self) {
self.election_elapsed = 0;
self.election_timeout = self
.rng
.gen_range(self.election_timeout_min, self.election_timeout_max);
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::*;
fn drive_to_leader(node: &mut RaftNode) {
for _ in 0..1_000 {
if node.is_leader() {
return;
}
let _ = node.step(Event::Tick).expect("tick");
}
panic!("node never became leader");
}
#[test]
fn test_single_node_elects_itself() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
assert_eq!(node.role(), Role::Leader);
assert_eq!(node.leader(), Some(1));
assert_eq!(node.term(), 1);
}
#[test]
fn test_single_node_commits_proposal() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let actions = node.step(Event::Propose(b"a".to_vec())).unwrap();
assert_eq!(node.commit_index(), 1);
assert_eq!(node.last_applied(), 1);
let applied: Vec<_> = actions
.iter()
.filter(|a| matches!(a, Action::Apply { .. }))
.collect();
assert_eq!(applied.len(), 1);
}
#[test]
fn test_propose_to_follower_is_rejected() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let err = node.step(Event::Propose(b"a".to_vec())).unwrap_err();
assert!(matches!(err, Error::NotLeader { .. }));
}
#[test]
fn test_candidate_requests_votes_from_peers() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let mut sends = Vec::new();
for _ in 0..1_000 {
let actions = node.step(Event::Tick).unwrap();
if !actions.is_empty() {
sends = actions;
break;
}
}
assert_eq!(node.role(), Role::Candidate);
let targets: Vec<NodeId> = sends
.iter()
.filter_map(|a| match a {
Action::Send {
to,
message: Message::RequestVote(_),
} => Some(*to),
_ => None,
})
.collect();
assert_eq!(targets.len(), 2);
assert!(targets.contains(&2) && targets.contains(&3));
}
#[test]
fn test_node_grants_one_vote_then_refuses_another_candidate() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let grant = |node: &mut RaftNode, candidate: NodeId| -> bool {
let actions = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 5,
candidate,
last_log_index: 0,
last_log_term: 0,
force: false,
})))
.unwrap();
actions.iter().any(|a| {
matches!(
a,
Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
)
})
};
assert!(grant(&mut node, 2));
assert!(!grant(&mut node, 3)); }
#[test]
fn test_higher_term_message_steps_node_down() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let leader_term = node.term();
let _ = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: leader_term + 5,
leader: 9,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.role(), Role::Follower);
assert_eq!(node.term(), leader_term + 5);
assert_eq!(node.leader(), Some(9));
}
#[test]
fn test_stale_term_request_vote_is_refused() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node); let term = node.term();
let actions = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: term - 1,
candidate: 2,
last_log_index: 99,
last_log_term: 99,
force: false,
})))
.unwrap();
let granted = actions.iter().any(|a| {
matches!(
a,
Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
)
});
assert!(!granted);
}
#[test]
fn test_heartbeat_resets_follower_election_timer() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5));
let _ = node.step(Event::Tick).unwrap();
let _ = node.step(Event::Tick).unwrap();
let _ = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 1,
leader: 2,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.role(), Role::Follower);
assert_eq!(node.leader(), Some(2));
let _ = node.step(Event::Tick).unwrap();
assert_eq!(node.role(), Role::Follower);
}
fn elect_multi_node_leader() -> RaftNode {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_heartbeat_interval(2));
for _ in 0..1_000 {
let actions = node.step(Event::Tick).expect("tick");
if !actions.is_empty() {
break; }
}
assert_eq!(node.role(), Role::Candidate);
let term = node.term();
let _ = node
.step(Event::Message(Message::RequestVoteReply(
RequestVoteReply {
term,
vote_granted: true,
from: 2,
},
)))
.expect("vote reply");
assert_eq!(node.role(), Role::Leader);
node
}
#[test]
fn test_vote_replies_elect_a_multi_node_leader() {
let node = elect_multi_node_leader();
assert_eq!(node.leader(), Some(1));
}
#[test]
fn test_leader_emits_heartbeats_on_interval() {
let mut node = elect_multi_node_leader();
let first = node.step(Event::Tick).unwrap();
assert!(first.is_empty());
let second = node.step(Event::Tick).unwrap();
let heartbeats = second
.iter()
.filter(|a| {
matches!(
a,
Action::Send {
message: Message::AppendEntries(_),
..
}
)
})
.count();
assert_eq!(heartbeats, 2);
}
#[test]
fn test_persisted_hard_state_is_restored() {
let mut log = MemoryLog::new();
log.set_hard_state(HardState {
term: 7,
voted_for: Some(3),
})
.unwrap();
let node = RaftNode::with_log(RaftConfig::single(1), log);
assert_eq!(node.term(), 7);
}
#[test]
fn test_vote_is_persisted_to_log() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
let _ = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 4,
candidate: 2,
last_log_index: 0,
last_log_term: 0,
force: false,
})))
.unwrap();
assert_eq!(
node.log().hard_state(),
HardState {
term: 4,
voted_for: Some(2)
}
);
}
fn entry(term: Term, index: Index) -> LogEntry {
LogEntry::new(term, index, vec![index as u8])
}
fn first_append_entries(actions: &[Action], to: NodeId) -> AppendEntries {
actions
.iter()
.find_map(|a| match a {
Action::Send {
to: dst,
message: Message::AppendEntries(ae),
} if *dst == to => Some(ae.clone()),
_ => None,
})
.expect("an AppendEntries to the peer")
}
#[test]
fn test_leader_replicates_and_commits_on_quorum() {
let mut node = elect_multi_node_leader();
let _ = node
.step(Event::Message(Message::AppendEntriesReply(
AppendEntriesReply {
term: node.term(),
success: true,
from: 2,
match_index: 0,
conflict_index: 0,
conflict_term: 0,
},
)))
.unwrap();
let actions = node.step(Event::Propose(b"x".to_vec())).unwrap();
assert_eq!(node.commit_index(), 0);
let ae = first_append_entries(&actions, 2);
assert_eq!(ae.entries.len(), 1);
assert_eq!(ae.entries[0].index, 1);
let applied = node
.step(Event::Message(Message::AppendEntriesReply(
AppendEntriesReply {
term: node.term(),
success: true,
from: 2,
match_index: 1,
conflict_index: 0,
conflict_term: 0,
},
)))
.unwrap();
assert_eq!(node.commit_index(), 1);
assert!(
applied
.iter()
.any(|a| matches!(a, Action::Apply { index: 1, .. }))
);
}
#[test]
fn test_follower_appends_streamed_entries() {
let mut node = RaftNode::new(RaftConfig::new(5, [1]));
let actions = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 2,
leader: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![entry(2, 1), entry(2, 2)],
leader_commit: 2,
})))
.unwrap();
assert_eq!(node.log().last_index(), 2);
assert_eq!(node.commit_index(), 2);
let reply = actions
.iter()
.find_map(|a| match a {
Action::Send {
message: Message::AppendEntriesReply(r),
..
} => Some(r.clone()),
_ => None,
})
.expect("a reply");
assert!(reply.success);
assert_eq!(reply.match_index, 2);
}
#[test]
fn test_follower_truncates_divergent_tail() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(2, 2)]).unwrap();
let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
let actions = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 3,
leader: 1,
prev_log_index: 1,
prev_log_term: 1,
entries: vec![entry(3, 2)],
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.log().last_index(), 2);
assert_eq!(node.log().entry(2).unwrap().term, 3);
let reply = first_reply(&actions);
assert!(reply.success);
assert_eq!(reply.match_index, 2);
}
#[test]
fn test_follower_rejects_short_log_with_length_hint() {
let mut node = RaftNode::new(RaftConfig::new(5, [1]));
let actions = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 2,
leader: 1,
prev_log_index: 3,
prev_log_term: 1,
entries: vec![entry(2, 4)],
leader_commit: 0,
})))
.unwrap();
let reply = first_reply(&actions);
assert!(!reply.success);
assert_eq!(reply.conflict_index, 1); assert_eq!(reply.conflict_term, 0);
}
#[test]
fn test_follower_rejects_term_mismatch_with_term_hint() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
.unwrap();
let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
let actions = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 5,
leader: 1,
prev_log_index: 3,
prev_log_term: 4, entries: Vec::new(),
leader_commit: 0,
})))
.unwrap();
let reply = first_reply(&actions);
assert!(!reply.success);
assert_eq!(reply.conflict_term, 1);
assert_eq!(reply.conflict_index, 1); }
#[test]
fn test_rejection_backtracks_then_converges() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
.unwrap();
log.set_hard_state(HardState {
term: 1,
voted_for: Some(1),
})
.unwrap();
let mut leader =
RaftNode::with_log(RaftConfig::new(1, [2]).with_election_timeout(5, 5), log);
let mut follower = RaftNode::new(RaftConfig::new(2, [1]));
let mut pending = Vec::new();
for _ in 0..50 {
let acts = leader.step(Event::Tick).unwrap();
if !acts.is_empty() {
pending = acts;
break;
}
}
let _ = leader
.step(Event::Message(Message::RequestVoteReply(
RequestVoteReply {
term: leader.term(),
vote_granted: true,
from: 2,
},
)))
.unwrap();
assert!(leader.is_leader());
let _ = pending;
let mut queue: Vec<(NodeId, Message)> = drain_sends(&mut leader);
for _ in 0..100 {
if follower.log().last_index() == 3 {
break;
}
let mut next = Vec::new();
for (to, msg) in queue.drain(..) {
let acts = if to == 2 {
follower.step(Event::Message(msg)).unwrap()
} else {
leader.step(Event::Message(msg)).unwrap()
};
next.extend(collect_sends(acts));
}
queue = next;
if queue.is_empty() {
queue = leader
.step(Event::Tick)
.unwrap()
.into_iter()
.filter_map(send_pair)
.collect();
}
}
assert_eq!(follower.log().last_index(), 3);
assert_eq!(follower.log().entry(3).unwrap().term, 1);
}
fn first_reply(actions: &[Action]) -> AppendEntriesReply {
actions
.iter()
.find_map(|a| match a {
Action::Send {
message: Message::AppendEntriesReply(r),
..
} => Some(r.clone()),
_ => None,
})
.expect("an AppendEntriesReply")
}
fn send_pair(a: Action) -> Option<(NodeId, Message)> {
match a {
Action::Send { to, message } => Some((to, message)),
_ => None,
}
}
fn collect_sends(actions: Vec<Action>) -> Vec<(NodeId, Message)> {
actions.into_iter().filter_map(send_pair).collect()
}
fn drain_sends(node: &mut RaftNode) -> Vec<(NodeId, Message)> {
let acts = node.step(Event::Tick).unwrap();
collect_sends(acts)
}
#[derive(Default)]
struct SyncCountLog {
inner: MemoryLog,
syncs: std::cell::Cell<u32>,
}
impl SyncCountLog {
fn syncs(&self) -> u32 {
self.syncs.get()
}
}
impl RaftLog for SyncCountLog {
fn last_index(&self) -> Index {
self.inner.last_index()
}
fn last_term(&self) -> Term {
self.inner.last_term()
}
fn term_at(&self, index: Index) -> Option<Term> {
self.inner.term_at(index)
}
fn entry(&self, index: Index) -> Option<LogEntry> {
self.inner.entry(index)
}
fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
self.inner.append(entries)
}
fn truncate(&mut self, from: Index) -> Result<()> {
self.inner.truncate(from)
}
fn hard_state(&self) -> HardState {
self.inner.hard_state()
}
fn set_hard_state(&mut self, state: HardState) -> Result<()> {
self.inner.set_hard_state(state)
}
fn sync(&mut self) -> Result<()> {
self.syncs.set(self.syncs.get() + 1);
self.inner.sync()
}
}
#[test]
fn test_granting_a_vote_persists_and_syncs_before_replying() {
let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), SyncCountLog::default());
let actions = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 4,
candidate: 2,
last_log_index: 0,
last_log_term: 0,
force: false,
})))
.unwrap();
assert!(actions.iter().any(|a| matches!(
a,
Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
)));
assert!(
node.log().syncs() >= 1,
"vote must be synced before the reply"
);
assert_eq!(node.log().hard_state().voted_for, Some(2));
}
#[test]
fn test_snapshot_hint_then_compaction() {
let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(2));
drive_to_leader(&mut node);
let mut hint = None;
for _ in 0..4 {
let actions = node.step(Event::Propose(b"c".to_vec())).unwrap();
if let Some(Action::Snapshot { index, term }) = actions
.iter()
.find(|a| matches!(a, Action::Snapshot { .. }))
.cloned()
{
hint = Some((index, term));
break;
}
}
let (index, _term) = hint.expect("a snapshot hint once the log grew");
assert!(index >= 2);
let _ = node
.step(Event::Snapshot {
index,
data: b"state".to_vec(),
})
.unwrap();
assert_eq!(node.log().snapshot_index(), index);
assert_eq!(node.log().entry(1), None); assert_eq!(node.commit_index(), node.commit_index()); }
#[test]
fn test_snapshot_event_rejects_uncommitted_index() {
let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(0));
drive_to_leader(&mut node);
let _ = node.step(Event::Propose(b"c".to_vec())).unwrap(); let _ = node
.step(Event::Snapshot {
index: 99,
data: vec![],
})
.unwrap();
assert_eq!(node.log().snapshot_index(), 0);
}
#[test]
fn test_leader_sends_install_snapshot_when_follower_is_behind() {
let mut log = MemoryLog::new();
log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
.unwrap();
log.apply_snapshot(&Snapshot::new(2, 1, b"snap".to_vec()))
.unwrap();
log.set_hard_state(HardState {
term: 1,
voted_for: Some(1),
})
.unwrap();
let mut node =
RaftNode::with_log(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5), log);
let mut elected = false;
for _ in 0..50 {
let _ = node.step(Event::Tick).unwrap();
if node.role() == Role::Candidate {
let _ = node
.step(Event::Message(Message::RequestVoteReply(
RequestVoteReply {
term: node.term(),
vote_granted: true,
from: 2,
},
)))
.unwrap();
}
if node.is_leader() {
elected = true;
break;
}
}
assert!(elected);
let actions = node
.step(Event::Message(Message::AppendEntriesReply(
AppendEntriesReply {
term: node.term(),
success: false,
from: 2,
match_index: 0,
conflict_index: 1, conflict_term: 0,
},
)))
.unwrap();
assert!(actions.iter().any(|a| matches!(
a,
Action::Send {
to: 2,
message: Message::InstallSnapshot(_)
}
)));
}
#[test]
fn test_follower_installs_snapshot_and_restores() {
let mut node = RaftNode::new(RaftConfig::new(5, [1]));
let actions = node
.step(Event::Message(Message::InstallSnapshot(InstallSnapshot {
term: 3,
leader: 1,
snapshot: Snapshot::new(8, 2, b"the state".to_vec()),
})))
.unwrap();
assert_eq!(node.log().snapshot_index(), 8);
assert_eq!(node.commit_index(), 8);
assert!(
actions
.iter()
.any(|a| matches!(a, Action::RestoreSnapshot { index: 8, .. }))
);
assert!(actions.iter().any(|a| matches!(
a,
Action::Send { message: Message::InstallSnapshotReply(r), .. } if r.last_index == 8
)));
}
#[test]
fn test_node_recovers_applied_position_from_snapshot() {
let mut log = MemoryLog::new();
log.apply_snapshot(&Snapshot::new(6, 2, b"s".to_vec()))
.unwrap();
let node = RaftNode::with_log(RaftConfig::single(1), log);
assert_eq!(node.commit_index(), 6);
assert_eq!(node.last_applied(), 6);
}
#[test]
fn test_rejected_vote_makes_no_durable_write() {
let mut log = SyncCountLog::default();
log.set_hard_state(HardState {
term: 5,
voted_for: Some(9),
})
.unwrap();
let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
let before = node.log().syncs();
let _ = node
.step(Event::Message(Message::RequestVote(RequestVote {
term: 3, candidate: 2,
last_log_index: 0,
last_log_term: 0,
force: false,
})))
.unwrap();
assert_eq!(node.log().syncs(), before, "a no-op vote must not sync");
}
fn membership_changed(actions: &[Action]) -> Option<Vec<NodeId>> {
actions.iter().find_map(|a| match a {
Action::MembershipChanged { members } => Some(members.clone()),
_ => None,
})
}
#[test]
fn test_node_reports_bootstrap_membership() {
let node = RaftNode::new(RaftConfig::new(1, [3, 2]));
assert_eq!(node.members(), &[1, 2, 3]); }
#[test]
fn test_add_server_adopts_config_immediately() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let actions = node.step(Event::AddServer(2)).unwrap();
assert_eq!(node.members(), &[1, 2]);
assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
let last = node.log().last_index();
assert_eq!(node.log().entry(last).unwrap().members(), Some(vec![1, 2]));
}
#[test]
fn test_remove_server_adopts_config() {
let mut node = elect_multi_node_leader(); let actions = node.step(Event::RemoveServer(3)).unwrap();
assert_eq!(node.members(), &[1, 2]);
assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
}
#[test]
fn test_add_existing_member_is_noop() {
let mut node = elect_multi_node_leader();
let actions = node.step(Event::AddServer(2)).unwrap();
assert!(actions.is_empty());
assert_eq!(node.members(), &[1, 2, 3]);
}
#[test]
fn test_one_config_change_at_a_time() {
let mut node = RaftNode::new(RaftConfig::single(1));
drive_to_leader(&mut node);
let _ = node.step(Event::AddServer(2)).unwrap();
let err = node.step(Event::AddServer(3)).unwrap_err();
assert!(matches!(err, Error::ConfigInProgress));
}
#[test]
fn test_change_membership_rejected_on_follower() {
let mut node = RaftNode::new(RaftConfig::new(2, [1, 3]));
let err = node.step(Event::AddServer(4)).unwrap_err();
assert!(matches!(err, Error::NotLeader { .. }));
}
#[test]
fn test_membership_recovered_from_config_entry() {
let mut log = MemoryLog::new();
log.append(&[
LogEntry::new(1, 1, b"x".to_vec()),
LogEntry::config(1, 2, &[1, 2, 3, 4]),
])
.unwrap();
let node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
assert_eq!(node.members(), &[1, 2, 3, 4]);
}
#[test]
fn test_membership_recovered_from_snapshot_config() {
let mut log = MemoryLog::new();
log.apply_snapshot(&Snapshot::with_config(
5,
2,
vec![1, 2, 3, 4, 5],
b"s".to_vec(),
))
.unwrap();
let node = RaftNode::with_log(RaftConfig::single(1), log);
assert_eq!(node.members(), &[1, 2, 3, 4, 5]);
}
#[test]
fn test_follower_adopts_config_from_append() {
let mut node = RaftNode::new(RaftConfig::new(5, [1]));
let actions = node
.step(Event::Message(Message::AppendEntries(AppendEntries {
term: 2,
leader: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![LogEntry::config(2, 1, &[1, 5, 9])],
leader_commit: 0,
})))
.unwrap();
assert_eq!(node.members(), &[1, 5, 9]);
assert_eq!(membership_changed(&actions), Some(vec![1, 5, 9]));
}
#[test]
fn test_timeout_now_triggers_immediate_election() {
let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(1000, 1000));
let actions = node
.step(Event::Message(Message::TimeoutNow(TimeoutNow {
term: 0,
leader: 2,
})))
.unwrap();
assert_eq!(node.role(), Role::Candidate);
assert!(actions.iter().any(|a| matches!(
a,
Action::Send {
message: Message::RequestVote(_),
..
}
)));
}
#[test]
fn test_transfer_to_caught_up_follower_sends_timeout_now() {
let mut node = elect_multi_node_leader(); let _ = node
.step(Event::Message(Message::AppendEntriesReply(
AppendEntriesReply {
term: node.term(),
success: true,
from: 2,
match_index: 0,
conflict_index: 0,
conflict_term: 0,
},
)))
.unwrap();
let actions = node.step(Event::TransferLeadership(2)).unwrap();
assert!(actions.iter().any(|a| matches!(
a,
Action::Send {
to: 2,
message: Message::TimeoutNow(_)
}
)));
}
#[test]
fn test_transfer_to_non_voter_is_noop() {
let mut node = elect_multi_node_leader();
let actions = node.step(Event::TransferLeadership(99)).unwrap();
assert!(actions.is_empty());
}
#[test]
fn test_non_voter_does_not_start_election() {
let mut log = MemoryLog::new();
log.append(&[LogEntry::config(1, 1, &[1, 2, 3])]).unwrap(); let mut node = RaftNode::with_log(
RaftConfig::new(5, [1, 2, 3]).with_election_timeout(2, 2),
log,
);
assert_eq!(node.members(), &[1, 2, 3]);
for _ in 0..50 {
let _ = node.step(Event::Tick).unwrap();
}
assert_eq!(node.role(), Role::Follower);
}
}