use crate::core::candidate::Candidate;
use crate::core::follower::Follower;
use crate::core::leader::Leader;
use crate::core::observer::Observer;
use crate::core::startup::Startup;
use crate::error::{to_storage_error, Error, Result};
use crate::member::MemberConfig;
use crate::metrics::{Metrics, MetricsReporter};
use crate::msg::Message;
use crate::rpc::{HeartbeatRequest, HeartbeatResponse, VoteRequest, VoteResponse};
use crate::storage::{HardState, Storage};
use crate::task::TaskSpawner;
use crate::util::TryToString;
use crate::{ElectionType, Event, EventHandler, Options, Thread, VoteFactor, VoteResult};
use crossbeam_channel::{Receiver, Sender};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use sync_wait_group::WaitGroup;
mod candidate;
mod follower;
mod leader;
mod observer;
mod startup;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[repr(u8)]
pub enum State {
Shutdown = 0,
Startup = 1,
Follower = 2,
PreCandidate = 3,
Candidate = 4,
Leader = 5,
Observer = 6,
}
pub struct ElectionCore<T: ElectionType> {
options: Options,
members: MemberConfig<T>,
task_spawner: Arc<T::TaskSpawner>,
storage: T::Storage,
rpc: Arc<T::Rpc>,
state: State,
prev_state: State,
state_id: u64,
hard_state: HardState<T>,
current_leader: Option<T::NodeId>,
vote_id: u64,
in_moving_leader: bool,
step_up_or_down: bool,
last_heartbeat_time: Option<(Instant, SystemTime)>,
next_election_timeout: Option<Instant>,
msg_tx: Sender<Message<T>>,
msg_rx: Receiver<Message<T>>,
event_handler: Arc<dyn EventHandler<T>>,
metrics_reporter: MetricsReporter<T>,
task_wait_group: WaitGroup,
}
impl<T: ElectionType> ElectionCore<T> {
#[allow(clippy::too_many_arguments)]
#[inline]
pub(crate) fn new(
options: Options,
node_id: T::NodeId,
task_spawner: Arc<T::TaskSpawner>,
storage: T::Storage,
rpc: Arc<T::Rpc>,
msg_tx: Sender<Message<T>>,
msg_rx: Receiver<Message<T>>,
event_handler: Arc<dyn EventHandler<T>>,
metrics_reporter: MetricsReporter<T>,
) -> Self {
ElectionCore {
options,
members: MemberConfig::new(node_id),
state: State::Startup,
prev_state: State::Startup,
state_id: 0,
hard_state: HardState {
current_term: 0,
voted_for: None,
},
current_leader: None,
vote_id: 0,
task_spawner,
storage,
rpc,
last_heartbeat_time: None,
next_election_timeout: None,
msg_tx,
msg_rx,
event_handler,
metrics_reporter,
task_wait_group: WaitGroup::new(),
in_moving_leader: false,
step_up_or_down: false,
}
}
#[inline]
pub fn spawn(self) -> Result<T::Thread> {
T::Thread::spawn("election-main".try_to_string()?, move || self.main())
}
fn main(mut self) {
info!(
"[{}][Term({})] start election main thread",
self.node_id(),
self.current_term()
);
self.set_state(State::Startup, None);
loop {
match self.state {
State::Shutdown => {
let _result = self.spawn_event_handling_task(Event::Shutdown);
self.task_wait_group.wait();
info!(
"[{}][Term({})] election has shutdown",
self.members.current(),
self.hard_state.current_term
);
return;
}
State::Startup => Startup::new(&mut self).run(),
State::Follower => Follower::new(&mut self).run(),
State::PreCandidate => Candidate::new(&mut self, true).run(),
State::Candidate => Candidate::new(&mut self, false).run(),
State::Leader => Leader::new(&mut self).run(),
State::Observer => Observer::new(&mut self).run(),
}
}
}
#[inline]
fn update_options(&mut self, options: Options) {
self.options = options;
}
#[inline]
fn node_id(&self) -> &T::NodeId {
self.members.current()
}
#[inline]
fn state(&self) -> State {
self.state
}
#[inline]
fn prev_state(&self) -> State {
self.prev_state
}
#[inline]
fn is_state(&self, state: State) -> bool {
self.state == state
}
#[inline]
fn set_state(&mut self, state: State, set_prev_state: Option<&mut bool>) {
if let Some(set_prev) = set_prev_state {
if *set_prev {
self.prev_state = self.state;
*set_prev = false;
}
}
self.state = state;
}
#[inline]
fn increase_state_id(&mut self) {
self.state_id += 1;
}
#[inline]
fn state_id(&self) -> u64 {
self.state_id
}
#[inline]
fn current_term(&self) -> u64 {
self.hard_state.current_term
}
#[inline]
fn set_hard_state(&mut self, state: HardState<T>) {
self.hard_state = state;
}
#[inline]
fn next_election_timeout(&mut self) -> Instant {
match self.next_election_timeout {
Some(instant) => instant,
None => {
let timeout = self.options.random_election_timeout();
let instant = Instant::now() + Duration::from_millis(timeout);
self.next_election_timeout = Some(instant);
instant
}
}
}
#[inline]
fn update_next_election_timeout(&mut self, heartbeat: bool) {
let now = Instant::now();
self.next_election_timeout = Some(now + Duration::from_millis(self.options.random_election_timeout()));
if heartbeat {
let system_now = SystemTime::now();
self.last_heartbeat_time = Some((now, system_now));
self.update_metrics(|metrics| {
metrics.last_heartbeat_time = Some(system_now);
});
}
}
#[inline]
fn check_node(&self, node_id: &T::NodeId) -> Result<()> {
if self.node_id().ne(node_id) {
Err(Error::InvalidTarget(try_format!(
"given node id({}) is not the same as this node({})",
node_id,
self.node_id()
)?))
} else {
Ok(())
}
}
#[inline]
fn update_current_term(&mut self, new_term: u64, voted_for: Option<T::NodeId>) -> Result<()> {
if new_term > self.current_term() {
let hard_state = HardState {
current_term: new_term,
voted_for,
};
self.storage
.save_hard_state(&hard_state)
.map_err(to_storage_error::<T>)?;
self.hard_state = hard_state;
}
Ok(())
}
#[inline]
fn save_hard_state(&mut self) -> Result<()> {
self.storage
.save_hard_state(&self.hard_state)
.map_err(to_storage_error::<T>)
}
#[inline]
fn spawn_task<F>(&self, name: &str, f: F) -> Result<()>
where
F: FnOnce(),
F: Send + 'static,
{
let wg = self.task_wait_group.clone();
self.task_spawner.spawn(name.try_to_string()?, move || {
f();
drop(wg);
})
}
fn handle_heartbeat(
&mut self,
msg: HeartbeatRequest<T>,
set_prev_state: Option<&mut bool>,
) -> Result<HeartbeatResponse<T>> {
self.check_node(&msg.target_node_id)?;
if msg.term < self.current_term() {
let current_term = self.current_term();
debug!(
"[{}][Term({})] heartbeat term({}) from leader({}) is less than current term({})",
self.node_id(),
current_term,
msg.term,
msg.leader_id,
current_term
);
return Ok(HeartbeatResponse {
node_id: self.node_id().clone(),
term: current_term,
});
}
self.update_next_election_timeout(true);
if self.current_term() != msg.term {
self.update_current_term(msg.term, None)?;
self.update_metrics(|metrics| {
metrics.current_term = msg.term;
});
}
if self.current_leader.as_ref() != Some(&msg.leader_id) {
match self.current_leader.as_ref() {
None => {
info!(
"[{}][Term({})] change leader to {}",
self.node_id(),
self.current_term(),
msg.leader_id
);
}
Some(old_leader) => {
info!(
"[{}][Term({})] change leader from {} to {}",
self.node_id(),
self.current_term(),
old_leader,
msg.leader_id
);
}
}
self.current_leader = Some(msg.leader_id.clone());
let _result = self.spawn_event_handling_task(Event::ChangeLeader(msg.leader_id.clone()));
self.update_metrics(|metrics| {
metrics.current_leader = Some(msg.leader_id);
});
}
if !self.is_state(State::Follower) && !self.is_state(State::Observer) {
info!(
"[{}][Term({})] received valid heartbeat in {:?} state, so transit to follower",
self.node_id(),
self.current_term(),
self.state()
);
self.set_state(State::Follower, set_prev_state);
}
Ok(HeartbeatResponse {
node_id: self.node_id().clone(),
term: self.current_term(),
})
}
#[inline]
fn create_vote_response(&self, req: VoteRequest<T>, vote_result: VoteResult) -> VoteResponse<T> {
VoteResponse {
node_id: self.node_id().clone(),
candidate_id: req.candidate_id,
vote_id: req.vote_id,
term: self.current_term(),
pre_vote: req.pre_vote,
vote_result,
}
}
fn handle_vote_request(
&mut self,
msg: VoteRequest<T>,
mut set_prev_state: Option<&mut bool>,
) -> Result<VoteResponse<T>> {
self.check_node(&msg.target_node_id)?;
if msg.term < self.current_term() {
debug!(
"[{}][Term({})] vote term({}) from candidate({}) is less than current term({})",
self.node_id(),
self.current_term(),
msg.term,
msg.candidate_id,
self.current_term()
);
return Ok(self.create_vote_response(msg, VoteResult::NotGranted));
}
if !msg.move_leader {
if let Some((instant, _)) = self.last_heartbeat_time {
let now = Instant::now();
let delta = now.duration_since(instant);
if (delta.as_millis() as u64) <= self.options.election_timeout_min() {
debug!(
"[{}][Term({})] reject vote request received within election timeout minimum",
self.node_id(),
self.current_term()
);
return Ok(self.create_vote_response(msg, VoteResult::NotGranted));
}
}
}
if msg.term > self.current_term() {
self.update_current_term(msg.term, None)?;
if !self.is_state(State::Follower) && !self.is_state(State::Observer) {
if self.is_state(State::Leader) {
log::info!("[{}] [Term({})] prepare step down", self.node_id(), self.current_term());
self.storage.prepare_step_down();
}
#[allow(clippy::needless_option_as_deref)]
self.set_state(State::Follower, set_prev_state.as_deref_mut());
self.update_next_election_timeout(false);
info!(
"[{}][Term({})] vote request term({}) is greater than current term({}), so transit to follower",
self.node_id(),
self.current_term(),
msg.term,
self.current_term()
);
} else {
info!(
"[{}][Term({})] vote request term({}) is greater than current term({})",
self.node_id(),
self.current_term(),
msg.term,
self.current_term()
);
self.update_metrics(|metrics| {
metrics.current_term = msg.term;
});
}
}
let current_vote_factor = self.storage.load_vote_factor().map_err(to_storage_error::<T>)?;
let vote_result = current_vote_factor.vote(&msg.factor);
if !vote_result.is_granted() {
debug!(
"[{}][Term({})] reject vote request as candidate({})'s vote result is {:?}",
self.node_id(),
self.current_term(),
msg.candidate_id,
vote_result
);
return Ok(self.create_vote_response(msg, vote_result));
}
if msg.pre_vote {
debug!(
"[{}][Term({})] voted for pre-candidate({})",
self.node_id(),
self.current_term(),
msg.candidate_id
);
return Ok(self.create_vote_response(msg, vote_result));
}
match &self.hard_state.voted_for {
None => {
if !self.is_state(State::Follower) && !self.is_state(State::Observer) {
self.set_state(State::Follower, set_prev_state);
self.update_next_election_timeout(false);
debug!(
"[{}][Term({})] granted vote for candidate({}) and revert to follower",
self.node_id(),
self.current_term(),
msg.candidate_id
);
} else {
debug!(
"[{}][Term({})] granted vote for candidate({})",
self.node_id(),
self.current_term(),
msg.candidate_id
);
}
self.hard_state.voted_for = Some(msg.candidate_id.clone());
self.save_hard_state()?;
Ok(self.create_vote_response(msg, vote_result))
}
Some(candidate_id) => {
debug!(
"[{}][Term({})] reject vote request for candidate({}) because already voted for node({})",
self.node_id(),
self.current_term(),
msg.candidate_id,
candidate_id
);
Ok(self.create_vote_response(msg, VoteResult::NotGranted))
}
}
}
#[inline]
fn spawn_event_handling_task(&self, event: Event<T>) -> Result<()> {
let handler = self.event_handler.clone();
let ev = event.clone();
let tx = self.msg_tx.clone();
let term = self.current_term();
let state_id = self.state_id();
let result = self.spawn_task("election-event-handler", move || {
let result = handler.handle_event(ev.clone());
let error = result.err();
let _ = tx.send(Message::EventHandlingResult {
event: ev,
error,
term,
state_id,
});
});
if let Err(ref e) = result {
error!(
"[{}][Term({})] failed to spawn task to for event ({:?}): {}",
self.node_id(),
self.current_term(),
event,
e
);
}
result
}
#[inline]
fn reject_init_with_members(&self, tx: Sender<Result<()>>) {
let _ = tx.send(Err(try_format_error!(
NotAllowed,
"can't init with members in {:?} state",
self.state(),
)));
}
#[inline]
fn reject_move_leader(&self, tx: Sender<Result<()>>) {
let _ = tx.send(Err(try_format_error!(
NotAllowed,
"can't move leader in {:?} state",
self.state(),
)));
}
#[inline]
fn reject_step_up_to_leader(&self, tx: Sender<Result<()>>) {
let _ = tx.send(Err(try_format_error!(
NotAllowed,
"can't step up to leader in {:?} state",
self.state(),
)));
}
#[inline]
fn reject_step_down_to_follower(&self, tx: Sender<Result<()>>) {
let _ = tx.send(Err(try_format_error!(
NotAllowed,
"can't step down to follower in {:?} state",
self.state(),
)));
}
#[inline]
fn report_metrics(&mut self) {
self.metrics_reporter.report(Metrics {
state: self.state(),
current_term: self.current_term(),
current_leader: self.current_leader.clone(),
last_heartbeat_time: self.last_heartbeat_time.as_ref().map(|t| t.1),
})
}
#[inline]
fn update_metrics<F>(&mut self, f: F)
where
F: FnOnce(&mut Metrics<T>),
{
self.metrics_reporter.update(f)
}
}