use super::*;
pub struct Effect {
pub voter: Voter,
pub peers: Peers,
pub state_machine: StateMachine,
}
impl Effect {
pub async fn exec(self, force_vote: bool) -> Result<()> {
let _g = self.voter.vote_sequencer.try_acquire()?;
info!("try to promote to leader (force={force_vote})");
let pre_vote_term = {
let ballot = self.voter.read_ballot().await?;
ballot.cur_term + 1
};
info!("start pre-vote. try promote at term {pre_vote_term}");
let ok = self
.request_votes(pre_vote_term, force_vote, true)
.await
.unwrap_or(false);
if !ok {
info!("pre-vote failed for term {pre_vote_term}");
return Ok(());
}
let vote_term = {
let mut new_ballot = self.voter.read_ballot().await?;
let vote_term = new_ballot.cur_term + 1;
ensure!(vote_term == pre_vote_term);
new_ballot.cur_term = vote_term;
new_ballot.voted_for = Some(self.voter.driver.self_node_id());
self.voter.write_ballot(new_ballot).await?;
self.voter.write_election_state(ElectionState::Candidate);
vote_term
};
info!("start election. try promote at term {vote_term}");
let ok = self
.request_votes(vote_term, force_vote, false)
.await
.unwrap_or(false);
self.post_election(vote_term, ok).await?;
Ok(())
}
async fn request_votes(
&self,
vote_term: Term,
force_vote: bool,
pre_vote: bool,
) -> Result<bool> {
let (others, remaining) = {
let membership = self.peers.read_membership();
ensure!(membership.contains(&self.voter.driver.self_node_id()));
let n = membership.len();
let mut others = vec![];
for id in membership {
if id != self.voter.driver.self_node_id() {
others.push(id);
}
}
let majority = n / 2 + 1;
(others, majority - 1)
};
let log_last_clock = {
let last_log_index = self.state_machine.get_log_last_index().await?;
self.state_machine
.get_entry(last_log_index)
.await?
.this_clock
};
let mut vote_requests = vec![];
for endpoint in others {
let selfid = self.voter.driver.self_node_id();
let conn = self.voter.driver.connect(endpoint);
vote_requests.push(async move {
let req = request::RequestVote {
candidate_id: selfid,
candidate_clock: log_last_clock,
vote_term,
force_vote,
pre_vote,
};
let resp = conn.request_vote(req).await;
match resp {
Ok(granted) => granted,
Err(_) => false,
}
});
}
let ok = quorum::join(remaining, vote_requests).await;
Ok(ok)
}
async fn post_election(&self, vote_term: Term, ok: bool) -> Result<()> {
if ok {
info!("got enough votes from the cluster. promoted to leader");
let index = state_machine::effect::append_entry::Effect {
state_machine: self.state_machine.clone(),
}
.exec(
Command::serialize(Command::Barrier(vote_term)),
Some(vote_term),
)
.await?;
info!("noop barrier is queued at index({index}) (term={vote_term})");
peers::effect::reset_progress::Effect {
peers: self.peers.clone(),
}
.exec(index);
self.voter.write_election_state(ElectionState::Leader);
} else {
info!("failed to become leader. now back to follower");
self.voter.write_election_state(ElectionState::Follower);
}
Ok(())
}
}