use crate::core::{ElectionCore, State};
use crate::error::{to_rpc_error, Result};
use crate::msg::Message;
use crate::rpc::HeartbeatRequest;
use crate::{ElectionType, Event, HeartbeatResponse, MoveLeaderRequest, Rpc};
use crossbeam_channel::{RecvTimeoutError, Sender};
use std::time::{Duration, Instant};
pub struct Leader<'a, T: ElectionType> {
core: &'a mut ElectionCore<T>,
next_heartbeat_timeout: Option<Instant>,
}
impl<'a, T: ElectionType> Leader<'a, T> {
#[inline]
pub fn new(core: &'a mut ElectionCore<T>) -> Self {
Self {
core,
next_heartbeat_timeout: None,
}
}
#[inline]
fn update_next_heartbeat_timeout(&mut self) {
let now = Instant::now();
self.next_heartbeat_timeout = Some(now + Duration::from_millis(self.core.options.heartbeat_interval()));
}
#[inline]
fn next_heartbeat_timeout(&mut self) -> Instant {
match self.next_heartbeat_timeout {
Some(instant) => instant,
None => {
self.update_next_heartbeat_timeout();
self.next_heartbeat_timeout.unwrap()
}
}
}
fn spawn_parallel_heartbeat(&self) {
let current_term = self.core.current_term();
for member in self.core.members.peers() {
let req = HeartbeatRequest {
target_node_id: member.clone(),
leader_id: self.core.node_id().clone(),
term: current_term,
};
let rpc = self.core.rpc.clone();
let tx = self.core.msg_tx.clone();
let node_id = self.core.node_id().clone();
let target_node_id = member.clone();
trace!(
"[{}][Term({})] send heartbeat to node({})",
node_id,
current_term,
member
);
let _ = self.core.spawn_task("election-heartbeat", move || {
match rpc.heartbeat(req) {
Ok(resp) => {
let _ = tx.send(Message::HeartbeatResponse(resp));
}
Err(e) => {
warn!(
"[{}][Term({})] failed to send heartbeat request to {}: {}",
node_id, current_term, target_node_id, e
);
}
}
});
}
}
#[inline]
fn handle_heartbeat_response(&mut self, resp: HeartbeatResponse<T>, set_prev_state: Option<&mut bool>) {
if self.core.node_id().ne(&resp.node_id) {
return;
}
if resp.term > self.core.current_term() {
info!(
"[{}][Term({})] revert to follower due to a newer term from heartbeat response",
self.core.node_id(),
self.core.current_term()
);
self.core.set_state(State::Follower, set_prev_state);
}
}
fn spawn_move_leader(&self, target_node_id: T::NodeId, tx: Sender<Result<()>>) {
let current_term = self.core.current_term();
let rpc = self.core.rpc.clone();
let node_id = self.core.node_id().clone();
let tx2 = tx.clone();
debug!(
"[{}][Term({})] send move_leader to node({})",
node_id, current_term, target_node_id
);
let req = MoveLeaderRequest {
target_node_id,
term: current_term,
};
let result = self.core.spawn_task("election-move-leader", move || {
match rpc.move_leader(req) {
Ok(_) => {
let _ = tx.send(Ok(()));
}
Err(e) => {
let _ = tx.send(Err(to_rpc_error::<T>(e)));
}
}
});
if let Err(e) = result {
let _ = tx2.send(Err(e));
}
}
pub fn run(mut self) {
self.core.increase_state_id();
let mut set_prev_state = Some(true);
self.core.in_moving_leader = false;
assert!(self.core.is_state(State::Leader));
let result = self.core.spawn_event_handling_task(Event::TransitToLeader {
term: self.core.current_term(),
caused_by_step_up: self.core.step_up_or_down,
});
self.core.step_up_or_down = false;
if result.is_err() {
error!(
"[{}][Term({})] failed to handle TransitToLeader event, so transit to follower",
self.core.node_id(),
self.core.current_term()
);
self.core.set_state(State::Follower, None);
return;
}
self.core.last_heartbeat_time = None;
self.core.next_election_timeout = None;
self.next_heartbeat_timeout = None;
self.core.current_leader = Some(self.core.node_id().clone());
self.core.report_metrics();
info!(
"[{}][Term({})] start the leader loop",
self.core.node_id(),
self.core.current_term()
);
self.spawn_parallel_heartbeat();
loop {
if !self.core.is_state(State::Leader) {
return;
}
let heartbeat_timeout = self.next_heartbeat_timeout();
match self.core.msg_rx.recv_deadline(heartbeat_timeout) {
Ok(msg) => match msg {
Message::HeartbeatRequest { req, tx } => {
let result = self.core.handle_heartbeat(req, set_prev_state.as_mut());
if let Err(ref e) = result {
debug!(
"[{}][Term({})] failed to handle heartbeat request: {}",
self.core.node_id(),
self.core.current_term(),
e
);
}
let _ = tx.send(result);
}
Message::HeartbeatResponse(resp) => {
self.handle_heartbeat_response(resp, set_prev_state.as_mut());
}
Message::VoteRequest { req, tx } => {
let result = self.core.handle_vote_request(req, set_prev_state.as_mut());
if let Err(ref e) = result {
debug!(
"[{}][Term({})] failed to handle vote request: {}",
self.core.node_id(),
self.core.current_term(),
e
);
}
let _ = tx.send(result);
}
Message::VoteResponse(_) => {
}
Message::Initialize { tx, .. } => {
self.core.reject_init_with_members(tx);
}
Message::UpdateOptions { options, tx } => {
info!(
"[{}][Term({})] election update options: {:?}",
self.core.node_id(),
self.core.current_term(),
options
);
self.core.update_options(options);
let _ = tx.send(Ok(()));
}
Message::Shutdown => {
info!(
"[{}][Term({})] election received shutdown message",
self.core.node_id(),
self.core.current_term()
);
self.core.set_state(State::Shutdown, set_prev_state.as_mut());
}
Message::EventHandlingResult {
event,
error,
term: _,
state_id,
} => {
if let Some(e) = error {
error!(
"[{}][Term({})] failed to handle event ({:?}): {}",
self.core.node_id(),
self.core.current_term(),
event,
e
);
if let Event::TransitToLeader { .. } = event {
if self.core.state_id() == state_id {
error!(
"[{}][Term({})] failed to handle TransitToLeader event, so transit to follower",
self.core.node_id(),
self.core.current_term()
);
self.core.set_state(State::Follower, None);
}
}
}
}
Message::MoveLeader { target_node, tx } => {
if !self.core.members.contains(&target_node) {
let _ = tx.send(Err(try_format_error!(NotAllowed, "{} is not a member", target_node)));
} else {
self.spawn_move_leader(target_node, tx);
}
}
Message::MoveLeaderRequest { tx, .. } => {
self.core.reject_move_leader(tx);
}
Message::StepUpToLeader { tx, .. } => {
let _ = tx.send(Ok(()));
}
Message::StepDownToFollower { tx } => {
debug!(
"[{}][Term({})] step down to follower",
self.core.node_id(),
self.core.current_term(),
);
self.core.set_state(State::Follower, set_prev_state.as_mut());
self.core.step_up_or_down = true;
let _ = tx.send(Ok(()));
}
},
Err(e) => match e {
RecvTimeoutError::Timeout => {
self.spawn_parallel_heartbeat();
self.next_heartbeat_timeout = None;
}
RecvTimeoutError::Disconnected => {
info!(
"[{}][Term({})] the election message channel is disconnected",
self.core.node_id(),
self.core.current_term()
);
self.core.set_state(State::Shutdown, set_prev_state.as_mut());
}
},
}
}
}
}