use std::error::Error;
use std::ops::Deref;
use validit::Valid;
use validit::Validate;
use crate::RaftTypeConfig;
use crate::ServerState;
use crate::engine::LogIdList;
use crate::errors::ForwardToLeader;
use crate::log_id::raft_log_id::RaftLogId;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::RefLogIdOf;
use crate::type_config::alias::SnapshotMetaOf;
use crate::utime::Leased;
pub(crate) mod io_state;
mod log_state_reader;
mod membership_state;
mod vote_state_reader;
pub(crate) use io_state::IOState;
#[allow(unused)]
pub(crate) use io_state::io_id::IOId;
#[cfg(test)]
mod tests {
mod forward_to_leader_test;
mod is_initialized_test;
mod log_state_reader_test;
mod update_committed_test;
mod validate_test;
}
use display_more::DisplayOptionExt;
pub(crate) use log_state_reader::LogStateReader;
pub use membership_state::MembershipState;
pub(crate) use vote_state_reader::VoteStateReader;
use crate::base::shared_id_generator::SharedIdGenerator;
use crate::entry::RaftEntry;
use crate::entry::raft_entry_ext::RaftEntryExt;
use crate::progress::inflight_id::InflightId;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::io_state::io_progress::IOProgress;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MembershipStateOf;
use crate::type_config::alias::TermOf;
use crate::type_config::alias::VoteOf;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::vote::raft_vote::RaftVoteExt;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub struct RaftState<C>
where C: RaftTypeConfig
{
pub(crate) vote: Leased<VoteOf<C>, InstantOf<C>>,
pub log_ids: LogIdList<CommittedLeaderIdOf<C>>,
pub membership_state: MembershipStateOf<C>,
pub snapshot_meta: SnapshotMetaOf<C>,
pub(crate) last_inflight_id: u64,
pub server_state: ServerState,
pub(crate) io_state: Valid<IOState<C>>,
pub(crate) purge_upto: Option<LogIdOf<C>>,
pub(crate) progress_id_gen: SharedIdGenerator,
}
impl<C> Default for RaftState<C>
where
C: RaftTypeConfig,
C::NodeId: Default,
{
fn default() -> Self {
let vote = VoteOf::<C>::new_with_default_term(C::NodeId::default());
Self {
vote: Leased::without_last_update(vote),
log_ids: LogIdList::default(),
membership_state: MembershipState::default(),
snapshot_meta: SnapshotMetaOf::<C>::default(),
last_inflight_id: 0,
server_state: ServerState::default(),
io_state: Valid::new(IOState::default()),
purge_upto: None,
progress_id_gen: Default::default(),
}
}
}
impl<C> LogStateReader<CommittedLeaderIdOf<C>> for RaftState<C>
where C: RaftTypeConfig
{
fn ref_log_id(&self, index: u64) -> Option<RefLogIdOf<'_, C>> {
self.log_ids.ref_at(index)
}
fn last_log_id(&self) -> Option<&LogIdOf<C>> {
self.log_ids.last()
}
fn committed(&self) -> Option<&LogIdOf<C>> {
self.apply_progress().accepted()
}
fn io_applied(&self) -> Option<&LogIdOf<C>> {
self.io_state.applied()
}
fn io_snapshot_last_log_id(&self) -> Option<&LogIdOf<C>> {
self.io_state.snapshot()
}
fn io_purged(&self) -> Option<&LogIdOf<C>> {
self.io_state.purged()
}
fn snapshot_last_log_id(&self) -> Option<&LogIdOf<C>> {
self.snapshot_meta.last_log_id.as_ref()
}
fn purge_upto(&self) -> Option<&LogIdOf<C>> {
self.purge_upto.as_ref()
}
fn last_purged_log_id(&self) -> Option<&LogIdOf<C>> {
self.log_ids.purged()
}
}
impl<C> VoteStateReader<C> for RaftState<C>
where C: RaftTypeConfig
{
fn vote_ref(&self) -> &VoteOf<C> {
self.vote.deref()
}
}
impl<C> Validate for RaftState<C>
where C: RaftTypeConfig
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
if self.log_ids.purged().is_none() {
validit::less_equal!(self.log_ids.first().map(|r| r.index()), Some(0));
}
validit::less_equal!(self.last_purged_log_id(), self.purge_upto());
if self.snapshot_last_log_id().is_none() {
validit::less_equal!(self.purge_upto(), self.committed());
} else {
validit::less_equal!(self.purge_upto(), self.snapshot_last_log_id());
}
validit::less_equal!(self.snapshot_last_log_id(), self.committed());
validit::less_equal!(self.committed(), self.last_log_id());
self.membership_state.validate()?;
self.io_state.validate()?;
Ok(())
}
}
impl<C> RaftState<C>
where C: RaftTypeConfig
{
#[allow(dead_code)]
pub(crate) fn new(node_id: C::NodeId) -> Self {
let vote = VoteOf::<C>::new_with_default_term(node_id);
Self {
vote: Leased::without_last_update(vote),
log_ids: LogIdList::default(),
membership_state: MembershipState::default(),
snapshot_meta: SnapshotMetaOf::<C>::default(),
last_inflight_id: 0,
server_state: ServerState::default(),
io_state: Valid::new(IOState::default()),
purge_upto: None,
progress_id_gen: Default::default(),
}
}
pub fn vote_ref(&self) -> &VoteOf<C> {
self.vote.deref()
}
pub fn vote_last_modified(&self) -> Option<InstantOf<C>> {
self.vote.last_update()
}
pub fn committed(&self) -> Option<&LogIdOf<C>> {
self.apply_progress().accepted()
}
pub(crate) fn is_initialized(&self) -> bool {
if self.last_log_id().is_some() {
return true;
}
if self.vote_ref().leader_id().term() != TermOf::<C>::default() {
return true;
}
false
}
pub(crate) fn accepted_log_io(&self) -> Option<&IOId<C>> {
self.log_progress().accepted()
}
pub(crate) fn accept_log_io(&mut self, accepted: IOId<C>) -> Option<IOId<C>> {
let curr_accepted = self.log_progress().accepted().cloned();
tracing::debug!(
"{}: accept_log: current: {}, new_accepted: {}",
func_name!(),
curr_accepted.display(),
accepted
);
if cfg!(debug_assertions) {
let new_vote = accepted.to_app_vote();
let current_vote = curr_accepted.clone().map(|io_id| io_id.to_app_vote());
assert!(
Some(new_vote.as_ref_vote()) >= current_vote.as_ref().map(|x| x.as_ref_vote()),
"new accepted.committed_vote {} must be >= current accepted.committed_vote: {}",
new_vote,
current_vote.display(),
);
}
if Some(&accepted) > curr_accepted.as_ref() {
self.log_progress_mut().accept(accepted);
}
curr_accepted
}
pub(crate) fn extend_log_ids_from_same_leader<LID, I>(&mut self, new_log_ids: I)
where
LID: RaftLogId<CommittedLeaderId = CommittedLeaderIdOf<C>>,
I: IntoIterator<Item = LID>,
<I as IntoIterator>::IntoIter: DoubleEndedIterator,
{
self.log_ids.extend_from_same_leader(new_log_ids)
}
pub(crate) fn extend_log_ids<LID, I>(&mut self, new_log_id: I)
where
LID: RaftLogId<CommittedLeaderId = CommittedLeaderIdOf<C>>,
I: IntoIterator<Item = LID>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
self.log_ids.extend(new_log_id)
}
pub(crate) fn update_committed(&mut self, cluster_committed: LogIOId<C>) {
tracing::debug!(
"{}: leader_committed: {}, my_accepted: {}, my_committed: {}",
func_name!(),
cluster_committed,
self.accepted_log_io().display(),
self.committed().display()
);
self.io_state_mut().cluster_committed.try_update(cluster_committed.clone()).ok();
let local_committed = self.io_state.calculate_local_committed();
self.update_local_committed(&local_committed);
}
pub(crate) fn update_local_committed(&mut self, committed: &Option<LogIdOf<C>>) -> Option<Option<LogIdOf<C>>> {
if committed.as_ref() > self.committed() {
let prev = self.committed().cloned();
self.apply_progress_mut().accept(committed.clone().unwrap());
self.membership_state.commit(committed);
Some(prev)
} else {
None
}
}
pub(crate) fn log_progress(&self) -> &IOProgress<IOId<C>> {
&self.io_state().log_progress
}
pub(crate) fn log_progress_mut(&mut self) -> &mut IOProgress<IOId<C>> {
&mut self.io_state_mut().log_progress
}
pub(crate) fn apply_progress(&self) -> &IOProgress<LogIdOf<C>> {
&self.io_state().apply_progress
}
pub(crate) fn apply_progress_mut(&mut self) -> &mut IOProgress<LogIdOf<C>> {
&mut self.io_state_mut().apply_progress
}
pub(crate) fn snapshot_progress_mut(&mut self) -> &mut IOProgress<LogIdOf<C>> {
&mut self.io_state_mut().snapshot
}
pub(crate) fn io_state_mut(&mut self) -> &mut IOState<C> {
&mut self.io_state
}
pub(crate) fn io_state(&self) -> &IOState<C> {
&self.io_state
}
pub(crate) fn first_conflicting_index<Ent>(&self, entries: &[Ent]) -> usize
where Ent: RaftEntry<CommittedLeaderId = CommittedLeaderIdOf<C>> {
let l = entries.len();
for (i, ent) in entries.iter().enumerate() {
let ref_log_id = ent.ref_log_id();
if !self.has_log_id(ref_log_id.clone()) {
tracing::debug!("found conflicting log id at index {}: {}", i, ref_log_id);
return i;
}
}
tracing::debug!("no conflicting log id found");
l
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn purge_log(&mut self, upto: &LogIdOf<C>) {
self.log_ids.purge(upto);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn calc_server_state(&self, id: &C::NodeId) -> ServerState {
tracing::debug!(
"states: contains: {}, is_voter: {}, is_leader: {}, is_leading: {}",
self.membership_state.contains(id),
self.is_voter(id),
self.is_leader(id),
self.is_leading(id)
);
#[allow(clippy::collapsible_else_if)]
if self.is_leader(id) {
ServerState::Leader
} else if self.is_leading(id) {
ServerState::Candidate
} else {
if self.is_voter(id) {
ServerState::Follower
} else {
ServerState::Learner
}
}
}
pub(crate) fn is_voter(&self, id: &C::NodeId) -> bool {
self.membership_state.is_voter(id)
}
pub(crate) fn is_leading(&self, id: &C::NodeId) -> bool {
self.membership_state.contains(id) && self.vote.leader_node_id() == id
}
pub(crate) fn is_leader(&self, id: &C::NodeId) -> bool {
self.is_leading(id) && self.vote.is_committed()
}
pub(crate) fn new_leader(&mut self) -> Leader<C, LeaderQuorumSet<C>> {
let em = &self.membership_state.effective().membership();
let last_leader_log_ids = self.log_ids.by_last_leader();
Leader::new(
self.vote_ref().to_committed(),
em.to_quorum_set(),
em.learner_ids(),
last_leader_log_ids,
self.progress_id_gen.clone(),
)
}
pub(crate) fn forward_to_leader(&self) -> ForwardToLeader<C> {
let vote = self.vote_ref();
if vote.is_committed() {
let id = vote.to_leader_id().node_id().clone();
return self.new_forward_to_leader(id);
}
ForwardToLeader::empty()
}
pub(crate) fn new_forward_to_leader(&self, to: C::NodeId) -> ForwardToLeader<C> {
let node = self.membership_state.effective().get_node(&to);
if let Some(n) = node {
ForwardToLeader::new(to, n.clone())
} else {
tracing::debug!("id={} is not in membership, when getting leader id", to);
ForwardToLeader::empty()
}
}
pub(crate) fn new_inflight_id(&mut self) -> InflightId {
self.last_inflight_id += 1;
InflightId::new(self.last_inflight_id)
}
}