use std::error::Error;
use std::ops::Deref;
use tokio::time::Instant;
use crate::engine::LogIdList;
use crate::entry::RaftEntry;
use crate::equal;
use crate::error::ForwardToLeader;
use crate::less_equal;
use crate::log_id::RaftLogId;
use crate::node::Node;
use crate::utime::UTime;
use crate::validate::Validate;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::NodeId;
use crate::ServerState;
use crate::SnapshotMeta;
use crate::Vote;
mod accepted;
pub(crate) mod io_state;
mod log_state_reader;
mod membership_state;
pub(crate) mod snapshot_streaming;
mod vote_state_reader;
pub(crate) use io_state::IOState;
#[allow(unused)] pub(crate) use io_state::LogIOId;
#[cfg(test)]
mod tests {
mod accepted_test;
mod forward_to_leader_test;
mod log_state_reader_test;
mod validate_test;
}
pub(crate) use accepted::Accepted;
pub(crate) use log_state_reader::LogStateReader;
pub use membership_state::MembershipState;
pub(crate) use vote_state_reader::VoteStateReader;
pub(crate) use crate::raft_state::snapshot_streaming::StreamingState;
#[derive(Clone, Debug)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub struct RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
pub(crate) vote: UTime<Vote<NID>>,
pub committed: Option<LogId<NID>>,
pub(crate) purged_next: u64,
pub log_ids: LogIdList<NID>,
pub membership_state: MembershipState<NID, N>,
pub snapshot_meta: SnapshotMeta<NID, N>,
pub server_state: ServerState,
pub(crate) accepted: Accepted<NID>,
pub(crate) io_state: IOState<NID>,
pub(crate) snapshot_streaming: Option<StreamingState>,
pub(crate) purge_upto: Option<LogId<NID>>,
}
impl<NID, N> LogStateReader<NID> for RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
fn get_log_id(&self, index: u64) -> Option<LogId<NID>> {
self.log_ids.get(index)
}
fn last_log_id(&self) -> Option<&LogId<NID>> {
self.log_ids.last()
}
fn committed(&self) -> Option<&LogId<NID>> {
self.committed.as_ref()
}
fn io_applied(&self) -> Option<&LogId<NID>> {
self.io_state.applied()
}
fn io_purged(&self) -> Option<&LogId<NID>> {
self.io_state.purged()
}
fn snapshot_last_log_id(&self) -> Option<&LogId<NID>> {
self.snapshot_meta.last_log_id.as_ref()
}
fn purge_upto(&self) -> Option<&LogId<NID>> {
self.purge_upto.as_ref()
}
fn last_purged_log_id(&self) -> Option<&LogId<NID>> {
if self.purged_next == 0 {
return None;
}
self.log_ids.first()
}
}
impl<NID, N> VoteStateReader<NID> for RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
fn vote_ref(&self) -> &Vote<NID> {
self.vote.deref()
}
}
impl<NID, N> Validate for RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
if self.purged_next == 0 {
less_equal!(self.log_ids.first().index(), Some(0));
} else {
equal!(self.purged_next, self.log_ids.first().next_index());
}
less_equal!(self.last_purged_log_id(), self.purge_upto());
if self.snapshot_last_log_id().is_none() {
less_equal!(self.purge_upto(), self.committed());
} else {
less_equal!(self.purge_upto(), self.snapshot_last_log_id());
}
less_equal!(self.snapshot_last_log_id(), self.committed());
less_equal!(self.committed(), self.last_log_id());
self.membership_state.validate()?;
Ok(())
}
}
impl<NID, N> RaftState<NID, N>
where
NID: NodeId,
N: Node,
{
pub fn vote_ref(&self) -> &Vote<NID> {
self.vote.deref()
}
pub fn vote_last_modified(&self) -> Option<Instant> {
self.vote.utime()
}
pub(crate) fn accepted(&self) -> Option<&LogId<NID>> {
self.accepted.last_accepted_log_id(self.vote_ref().leader_id())
}
pub(crate) fn update_accepted(&mut self, accepted: Option<LogId<NID>>) {
debug_assert!(
self.vote_ref().is_committed(),
"vote must be committed: {}",
self.vote_ref()
);
debug_assert!(
self.vote_ref().leader_id() >= self.accepted.leader_id(),
"vote.leader_id: {} must be >= accepted.leader_id: {}",
self.vote_ref().leader_id(),
self.accepted.leader_id()
);
if accepted.as_ref() > self.accepted.last_accepted_log_id(self.vote_ref().leader_id()) {
self.accepted = Accepted::new(*self.vote_ref().leader_id(), accepted);
}
}
pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId<NID> + 'a>(&mut self, new_log_ids: &[LID]) {
self.log_ids.extend_from_same_leader(new_log_ids)
}
pub(crate) fn extend_log_ids<'a, LID: RaftLogId<NID> + 'a>(&mut self, new_log_id: &[LID]) {
self.log_ids.extend(new_log_id)
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_committed(&mut self, committed: &Option<LogId<NID>>) -> Option<Option<LogId<NID>>> {
if committed.as_ref() > self.committed() {
let prev = self.committed().copied();
self.committed = *committed;
self.membership_state.commit(committed);
Some(prev)
} else {
None
}
}
pub(crate) fn io_state_mut(&mut self) -> &mut IOState<NID> {
&mut self.io_state
}
pub(crate) fn io_state(&self) -> &IOState<NID> {
&self.io_state
}
pub(crate) fn first_conflicting_index<Ent>(&self, entries: &[Ent]) -> usize
where Ent: RaftLogId<NID> {
let l = entries.len();
for (i, ent) in entries.iter().enumerate() {
let log_id = ent.get_log_id();
if !self.has_log_id(log_id) {
tracing::debug!(
at = display(i),
entry_log_id = display(log_id),
"found nonexistent log id"
);
return i;
}
}
tracing::debug!("not found nonexistent");
l
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn purge_log(&mut self, upto: &LogId<NID>) {
self.purged_next = upto.index + 1;
self.log_ids.purge(upto);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn calc_server_state(&self, id: &NID) -> ServerState {
tracing::debug!(
is_member = display(self.is_voter(id)),
is_leader = display(self.is_leader(id)),
is_leading = display(self.is_leading(id)),
"states"
);
if self.is_voter(id) {
if self.is_leader(id) {
ServerState::Leader
} else if self.is_leading(id) {
ServerState::Candidate
} else {
ServerState::Follower
}
} else {
ServerState::Learner
}
}
pub(crate) fn is_voter(&self, id: &NID) -> bool {
self.membership_state.is_voter(id)
}
pub(crate) fn is_leading(&self, id: &NID) -> bool {
self.vote.leader_id().voted_for().as_ref() == Some(id)
}
pub(crate) fn is_leader(&self, id: &NID) -> bool {
self.vote.leader_id().voted_for().as_ref() == Some(id) && self.vote.is_committed()
}
pub(crate) fn assign_log_ids<'a, Ent: RaftEntry<NID, N> + 'a>(
&mut self,
entries: impl IntoIterator<Item = &'a mut Ent>,
) {
let mut log_id = LogId::new(
self.vote_ref().committed_leader_id().unwrap(),
self.last_log_id().next_index(),
);
for entry in entries {
entry.set_log_id(&log_id);
tracing::debug!("assign log id: {}", log_id);
log_id.index += 1;
}
}
pub(crate) fn forward_to_leader(&self) -> ForwardToLeader<NID, N> {
let vote = self.vote_ref();
if vote.is_committed() {
let id = vote.leader_id().voted_for().unwrap();
let node = self.membership_state.effective().get_node(&id);
if let Some(n) = node {
return ForwardToLeader::new(id, n.clone());
} else {
tracing::debug!("id={} is not in membership, when getting leader id", id);
}
};
ForwardToLeader::empty()
}
}