use std::time::Duration;
use display_more::DisplayOptionExt;
use validit::Valid;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftTypeConfig;
use crate::core::ServerState;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::sm;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineOutput;
use crate::engine::Respond;
use crate::engine::engine_config::EngineConfig;
use crate::engine::handler::establish_handler::EstablishHandler;
use crate::engine::handler::following_handler::FollowingHandler;
use crate::engine::handler::leader_handler::LeaderHandler;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::handler::vote_handler::VoteHandler;
use crate::entry::RaftEntry;
use crate::entry::payload::EntryPayload;
use crate::errors::ForwardToLeader;
use crate::errors::InitializeError;
use crate::errors::NotAllowed;
use crate::errors::NotInMembers;
use crate::errors::RejectAppendEntries;
use crate::proposer::Candidate;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::proposer::leader_state::CandidateState;
use crate::raft::LogSegment;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::stream_append::StreamAppendResult;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::LeaderIdOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::SnapshotMetaOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::TermOf;
use crate::type_config::alias::VoteOf;
use crate::vote::RaftLeaderId;
use crate::vote::RaftTerm;
use crate::vote::RaftVote;
use crate::vote::raft_vote::RaftVoteExt;
#[derive(Debug)]
pub(crate) struct Engine<C, SM = ()>
where C: RaftTypeConfig
{
pub(crate) config: EngineConfig<C>,
pub(crate) state: Valid<RaftState<C>>,
pub(crate) seen_greater_log: bool,
pub(crate) leader: LeaderState<C>,
pub(crate) candidate: CandidateState<C>,
pub(crate) output: EngineOutput<C, SM>,
}
impl<C, SM> Engine<C, SM>
where C: RaftTypeConfig
{
pub(crate) fn new(init_state: RaftState<C>, config: EngineConfig<C>) -> Self {
Self {
config,
state: Valid::new(init_state),
seen_greater_log: false,
leader: None,
candidate: None,
output: EngineOutput::new(4096),
}
}
pub(crate) fn new_candidate(&mut self, vote: VoteOf<C>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
let now = C::now();
let last_log_id = self.state.last_log_id().cloned();
let membership = self.state.membership_state.effective().membership();
self.candidate = Some(Candidate::new(
now,
vote,
last_log_id,
membership.to_quorum_set(),
membership.learner_ids(),
self.state.progress_id_gen.clone(),
));
self.candidate.as_mut().unwrap()
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn startup(&mut self) {
tracing::info!(
"startup begin: state: {:?}, is_leader: {}, is_voter: {}",
self.state,
self.state.is_leader(&self.config.id),
self.state.membership_state.effective().is_voter(&self.config.id)
);
if self.state.is_leader(&self.config.id) {
self.vote_handler().update_internal_server_state();
return;
}
let server_state = if self.state.membership_state.effective().is_voter(&self.config.id) {
ServerState::Follower
} else {
ServerState::Learner
};
self.state.server_state = server_state;
tracing::info!(
"startup done, id={}, target_state: {:?}",
self.config.id,
self.state.server_state
);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn initialize(&mut self, membership: Membership<C::NodeId, C::Node>) -> Result<(), InitializeError<C>> {
self.check_initialize()?;
self.check_members_contain_me(&membership)?;
let leader_id = LeaderIdOf::<C>::new(TermOf::<C>::default(), self.config.id.clone());
let vote = <VoteOf<C> as RaftVote>::from_leader_id(leader_id.clone(), true);
self.state.vote.update(C::now(), Duration::default(), vote);
let log_id = LogIdOf::<C>::new(leader_id.to_committed(), 0);
let entry = C::Entry::new(log_id, EntryPayload::Membership(membership));
self.following_handler().do_append_entries(vec![entry]);
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.term().next();
let leader_id = LeaderIdOf::<C>::new(new_term, self.config.id.clone());
let new_vote = VoteOf::<C>::from_leader_id(leader_id, false);
let candidate = self.new_candidate(new_vote.clone());
tracing::info!("{}: new candidate: {}", func_name!(), candidate);
let last_log_id = candidate.last_log_id().cloned();
self.vote_handler().update_vote(&new_vote).unwrap();
self.output.push_command(Command::SendVote {
vote_req: VoteRequest::new(new_vote, last_log_id),
});
self.server_state_handler().update_server_state_if_changed();
}
pub(crate) fn leader_ref(&self) -> Option<&Leader<C, LeaderQuorumSet<C>>> {
self.leader.as_deref()
}
pub(crate) fn leader_mut(&mut self) -> Option<&mut Leader<C, LeaderQuorumSet<C>>> {
self.leader.as_deref_mut()
}
pub(crate) fn candidate_ref(&self) -> Option<&Candidate<C, LeaderQuorumSet<C>>> {
self.candidate.as_ref()
}
pub(crate) fn candidate_mut(&mut self) -> Option<&mut Candidate<C, LeaderQuorumSet<C>>> {
self.candidate.as_mut()
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C>) -> VoteResponse<C> {
let now = C::now();
let local_leased_vote = &self.state.vote;
tracing::info!("handle vote request: req: {}", req);
tracing::info!(
"handle vote request: my_vote: {}, my_last_log_id: {}, lease: {}",
**local_leased_vote,
self.state.last_log_id().display(),
local_leased_vote.display_lease_info(now)
);
if local_leased_vote.is_committed() {
if !local_leased_vote.is_expired(now, Duration::from_millis(0)) {
tracing::info!(
"reject vote-request: leader lease has not yet expire: {}",
local_leased_vote.display_lease_info(now)
);
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false);
}
}
if req.last_log_id.as_ref() >= self.state.last_log_id() {
} else {
tracing::info!(
"reject vote-request: by last_log_id: !(req.last_log_id({}) >= my_last_log_id({})",
req.last_log_id.display(),
self.state.last_log_id().display(),
);
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), false);
}
let res = self.vote_handler().update_vote(&req.vote);
tracing::info!("handle vote request result: req: {}, result: {:?}", req, res);
VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().cloned(), res.is_ok())
}
#[tracing::instrument(level = "debug", skip(self, resp))]
pub(crate) fn handle_vote_resp(&mut self, target: C::NodeId, resp: VoteResponse<C>) {
tracing::info!(
"{}: resp: {}, target: {}, my_vote: {}, my_last_log_id: {}",
func_name!(),
resp,
target,
self.state.vote_ref(),
self.state.last_log_id().display()
);
let Some(candidate) = self.candidate_mut() else {
return;
};
if resp.vote_granted && &resp.vote == candidate.vote_ref() {
let quorum_granted = candidate.grant_by(&target);
if quorum_granted {
tracing::info!("a quorum granted my vote");
self.establish_leader();
}
return;
}
if resp.last_log_id.as_ref() > self.state.last_log_id() {
tracing::info!(
"{}: seen a greater log id: {}",
func_name!(),
resp.last_log_id.display()
);
self.set_greater_log();
}
let vote = resp.vote.to_non_committed().into_vote();
self.vote_handler().update_vote(&vote).ok();
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries(&mut self, vote: &VoteOf<C>, segment: LogSegment<C>, tx: AppendEntriesTx<C>) {
tracing::debug!(
"{}: vote: {}, segment: {}, my_vote: {}, my_last_log_id: {}",
func_name!(),
vote,
segment,
self.state.vote_ref(),
self.state.last_log_id().display()
);
let stream_result: StreamAppendResult<C> = self.append_entries(vote, segment).map_err(Into::into);
let condition = if stream_result.is_ok() {
Some(Condition::IOFlushed {
io_id: self.state.accepted_log_io().unwrap().clone(),
})
} else {
None
};
self.output.push_command(Command::Respond {
when: condition,
resp: Respond::new(stream_result, tx),
});
}
pub(crate) fn append_entries(
&mut self,
vote: &VoteOf<C>,
segment: LogSegment<C>,
) -> Result<Option<LogIdOf<C>>, RejectAppendEntries<C>> {
self.vote_handler().update_vote(vote)?;
let last = segment.last();
let mut fh = self.following_handler();
fh.ensure_log_consecutive(segment.prev_log_id.as_ref())?;
fh.append_entries(segment.prev_log_id, segment.entries);
Ok(last)
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_full_snapshot(
&mut self,
vote: VoteOf<C>,
snapshot: SnapshotOf<C>,
tx: OneshotSenderOf<C, SnapshotResponse<C>>,
) {
tracing::info!("{}: vote: {}, snapshot: {}", func_name!(), vote, snapshot);
let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| {
SnapshotResponse::new(state.vote_ref().clone())
});
let Some(tx) = vote_res else {
return;
};
let mut fh = self.following_handler();
let cond = fh.install_full_snapshot(snapshot);
let res = SnapshotResponse {
vote: self.state.vote_ref().clone(),
};
self.output.push_command(Command::Respond {
when: cond,
resp: Respond::new(res, tx),
});
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_begin_receiving_snapshot(&mut self, tx: OneshotSenderOf<C, SnapshotDataOf<C>>) {
tracing::info!("{}", func_name!());
self.output.push_command(Command::from(sm::Command::begin_receiving_snapshot(tx)));
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn leader_step_down(&mut self) {
tracing::debug!("leader step down, node_id: {}", self.config.id);
let em = &self.state.membership_state.effective();
tracing::debug!(
"membership: {}, committed: {}, is_leading: {}",
em,
self.state.committed().display(),
self.state.is_leading(&self.config.id),
);
#[allow(clippy::collapsible_if)]
if em.log_id().as_ref() <= self.state.committed() {
self.vote_handler().update_internal_server_state();
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn on_building_snapshot_done(&mut self, meta: Option<SnapshotMetaOf<C>>) {
tracing::info!("{}: snapshot_meta: {}", func_name!(), meta.display());
self.state.io_state_mut().set_building_snapshot(false);
let Some(meta) = meta else {
tracing::info!("snapshot building deferred by state machine, no meta update");
return;
};
if let Some(last_log_id) = meta.last_log_id.clone() {
self.state.io_state_mut().snapshot.try_update_all(last_log_id);
}
let mut h = self.snapshot_handler();
let updated = h.update_snapshot(meta);
if !updated {
return;
}
self.log_handler().schedule_policy_based_purge();
self.try_purge_log();
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_purge_log(&mut self) {
tracing::debug!("{}: purge_upto: {}", func_name!(), self.state.purge_upto().display());
if self.leader.is_some() {
self.replication_handler().try_purge_log();
} else {
self.log_handler().purge_log();
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn trigger_purge_log(&mut self, mut index: u64) {
tracing::info!("{}: index: {}", func_name!(), index);
let snapshot_last_log_id = self.state.snapshot_last_log_id();
let snapshot_last_log_id = if let Some(log_id) = snapshot_last_log_id {
log_id.clone()
} else {
tracing::info!("no snapshot, cannot purge");
return;
};
let scheduled = self.state.purge_upto();
if index < scheduled.next_index() {
tracing::info!(
"no update, already scheduled: {}; index: {}",
scheduled.display(),
index,
);
return;
}
if index > snapshot_last_log_id.index() {
tracing::info!(
"cannot purge logs not in a snapshot; index: {}, last in snapshot log id: {}",
index,
snapshot_last_log_id
);
index = snapshot_last_log_id.index();
}
let log_id = self.state.get_log_id(index).unwrap();
tracing::info!("{}: purge_upto: {}", func_name!(), log_id);
self.log_handler().update_purge_upto(log_id);
self.try_purge_log();
}
pub(crate) fn trigger_transfer_leader(&mut self, to: C::NodeId) {
tracing::info!("{}: to: {}", func_name!(), to);
let Some(mut lh) = self.try_leader_handler().ok() else {
tracing::info!(
"{}: this node is not a Leader, ignore transfer Leader: to: {}",
func_name!(),
to
);
return;
};
lh.transfer_leader(to);
}
pub(crate) fn next_progress_driven_command(&self) -> Option<Command<C, SM>> {
let apply_progress = &self.state.io_state.apply_progress;
let log_progress = &self.state.io_state.log_progress;
if log_progress.submitted().map(|x| x.as_ref_vote()) == log_progress.accepted().map(|x| x.as_ref_vote()) {
let apply_submitted = apply_progress.submitted();
let apply_accepted = apply_progress.accepted();
let log_submitted = log_progress.submitted().and_then(|io_id| io_id.last_log_id());
let applicable_upto = log_submitted.min(apply_accepted);
if apply_submitted.next_index() < applicable_upto.next_index() {
let apply_upto = applicable_upto.cloned().unwrap();
return Some(Command::SaveCommittedAndApply {
already_applied: apply_submitted.cloned(),
upto: apply_upto,
});
}
}
None
}
}
impl<C, SM> Engine<C, SM>
where C: RaftTypeConfig
{
#[tracing::instrument(level = "debug", skip_all)]
fn establish_leader(&mut self) {
tracing::info!("{}", func_name!());
let candidate = self.candidate.take().unwrap();
let leader = self.establish_handler().establish(candidate);
let Some(leader) = leader else { return };
let vote = leader.committed_vote_ref().clone();
let last_log_id = leader.last_log_id().cloned();
self.replication_handler().rebuild_replication_streams(true);
let _res = self.vote_handler().update_vote(&vote.clone().into_vote());
debug_assert!(_res.is_ok(), "commit vote cannot fail but: {:?}", _res);
self.state.accept_log_io(IOId::new_log_io(vote, last_log_id));
self.try_leader_handler().unwrap().leader_append_entries([EntryPayload::Blank]);
}
fn check_initialize(&self) -> Result<(), NotAllowed<C>> {
if !self.state.is_initialized() {
return Ok(());
}
tracing::info!(
"Engine::check_initialize(): cannot initialize: last_log_id: {}, vote: {}",
self.state.last_log_id().display(),
self.state.vote_ref()
);
Err(NotAllowed {
last_log_id: self.state.last_log_id().cloned(),
vote: self.state.vote_ref().clone(),
})
}
fn check_members_contain_me(&self, m: &Membership<C::NodeId, C::Node>) -> Result<(), NotInMembers<C>> {
if !m.is_voter(&self.config.id) {
let e = NotInMembers {
node_id: self.config.id.clone(),
membership: m.clone(),
};
Err(e)
} else {
Ok(())
}
}
pub(crate) fn is_there_greater_log(&self) -> bool {
self.seen_greater_log
}
pub(crate) fn set_greater_log(&mut self) {
self.seen_greater_log = true;
}
pub(crate) fn reset_greater_log(&mut self) {
self.seen_greater_log = false;
}
#[allow(dead_code)]
pub(crate) fn calc_server_state(&self) -> ServerState {
self.state.calc_server_state(&self.config.id)
}
pub(crate) fn vote_handler(&mut self) -> VoteHandler<'_, C, SM> {
VoteHandler {
config: &mut self.config,
state: &mut self.state,
output: &mut self.output,
leader: &mut self.leader,
candidate: &mut self.candidate,
}
}
pub(crate) fn log_handler(&mut self) -> LogHandler<'_, C, SM> {
LogHandler {
config: &mut self.config,
state: &mut self.state,
output: &mut self.output,
}
}
pub(crate) fn snapshot_handler(&mut self) -> SnapshotHandler<'_, '_, C, SM> {
SnapshotHandler {
state: &mut self.state,
output: &mut self.output,
}
}
pub(crate) fn try_leader_handler(&mut self) -> Result<LeaderHandler<'_, C, SM>, ForwardToLeader<C>> {
let leader = match self.leader.as_mut() {
None => {
tracing::debug!("not a leader, server_state: {:?}", self.state.server_state);
return Err(self.state.forward_to_leader());
}
Some(x) => x,
};
debug_assert!(
leader.committed_vote_ref().as_ref_vote() >= self.state.vote_ref().as_ref_vote(),
"leader.vote({}) >= state.vote({})",
leader.committed_vote_ref(),
self.state.vote_ref()
);
Ok(LeaderHandler {
config: &mut self.config,
leader,
state: &mut self.state,
output: &mut self.output,
})
}
pub(crate) fn try_replication_handler(&mut self) -> Option<ReplicationHandler<'_, C, SM>> {
let leader = self.leader.as_mut()?;
let rh = ReplicationHandler {
config: &mut self.config,
leader,
state: &mut self.state,
output: &mut self.output,
};
Some(rh)
}
pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<'_, C, SM> {
let leader = match self.leader.as_mut() {
None => {
unreachable!("There is no leader, cannot handle replication");
}
Some(x) => x,
};
ReplicationHandler {
config: &mut self.config,
leader,
state: &mut self.state,
output: &mut self.output,
}
}
pub(crate) fn following_handler(&mut self) -> FollowingHandler<'_, C, SM> {
debug_assert!(self.leader.is_none());
let leader_vote = self.state.vote_ref().clone();
debug_assert!(
leader_vote.is_committed(),
"Expect the Leader vote to be committed: {}",
leader_vote
);
FollowingHandler {
leader_vote: leader_vote.into_committed(),
config: &mut self.config,
state: &mut self.state,
output: &mut self.output,
}
}
pub(crate) fn server_state_handler(&mut self) -> ServerStateHandler<'_, C> {
ServerStateHandler {
config: &self.config,
state: &mut self.state,
}
}
pub(crate) fn establish_handler(&mut self) -> EstablishHandler<'_, C> {
EstablishHandler {
config: &mut self.config,
leader: &mut self.leader,
}
}
}
#[cfg(test)]
mod engine_testing {
use crate::RaftTypeConfig;
use crate::engine::Engine;
use crate::engine::EngineConfig;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::RaftState;
impl<C, SM> Engine<C, SM>
where C: RaftTypeConfig
{
pub(crate) fn testing_new_leader(&mut self) -> &mut crate::proposer::Leader<C, LeaderQuorumSet<C>> {
let leader = self.state.new_leader();
self.leader = Some(Box::new(leader));
self.leader.as_mut().unwrap()
}
}
impl<C> Engine<C, ()>
where C: RaftTypeConfig
{
pub(crate) fn testing_default(id: C::NodeId) -> Self {
let config = EngineConfig::new_default(id.clone());
let state = RaftState::new(id);
Self::new(state, config)
}
}
}