use std::error::Error;
use std::fmt;
use display_more::DisplayOptionExt;
use validit::Valid;
use validit::Validate;
use validit::less_equal;
use crate::RaftTypeConfig;
use crate::raft_state::IOId;
use crate::raft_state::io_state::io_progress::IOProgress;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::raft_state::io_state::monotonic::MonotonicIncrease;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::VoteOf;
pub(crate) mod io_id;
pub(crate) mod io_progress;
pub(crate) mod log_io_id;
pub(crate) mod monotonic;
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOState<C>
where C: RaftTypeConfig
{
building_snapshot: bool,
pub(crate) log_progress: Valid<IOProgress<IOId<C>>>,
pub(crate) apply_progress: Valid<IOProgress<LogIdOf<C>>>,
pub(crate) snapshot: Valid<IOProgress<LogIdOf<C>>>,
pub(crate) cluster_committed: MonotonicIncrease<LogIOId<C>>,
pub(crate) purged: Option<LogIdOf<C>>,
}
const LOG_PROGRESS_NAME: &str = "LogIO";
const APPLY_PROGRESS_NAME: &str = "Apply";
const SNAPSHOT_PROGRESS_NAME: &str = "Snapshot";
impl<C> Default for IOState<C>
where C: RaftTypeConfig
{
fn default() -> Self {
Self {
building_snapshot: false,
log_progress: new_progress(None, "xx", LOG_PROGRESS_NAME),
apply_progress: new_progress(None, "xx", APPLY_PROGRESS_NAME),
snapshot: new_progress(None, "xx", SNAPSHOT_PROGRESS_NAME),
cluster_committed: MonotonicIncrease::default(),
purged: None,
}
}
}
impl<C> Validate for IOState<C>
where C: RaftTypeConfig
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
self.log_progress.validate()?;
self.apply_progress.validate()?;
self.snapshot.validate()?;
less_equal!(self.snapshot.submitted(), self.apply_progress.submitted());
less_equal!(&self.purged, &self.snapshot.flushed().cloned());
Ok(())
}
}
impl<C> IOState<C>
where C: RaftTypeConfig
{
pub(crate) fn new(
id: &str,
vote: &VoteOf<C>,
applied: Option<LogIdOf<C>>,
snapshot: Option<LogIdOf<C>>,
purged: Option<LogIdOf<C>>,
) -> Self {
Self {
building_snapshot: false,
log_progress: new_progress(Some(IOId::new(vote)), id, LOG_PROGRESS_NAME),
apply_progress: new_progress(applied, id, APPLY_PROGRESS_NAME),
snapshot: new_progress(snapshot, id, SNAPSHOT_PROGRESS_NAME),
cluster_committed: MonotonicIncrease::default(),
purged,
}
}
pub(crate) fn applied(&self) -> Option<&LogIdOf<C>> {
self.apply_progress.flushed()
}
pub(crate) fn snapshot(&self) -> Option<&LogIdOf<C>> {
self.snapshot.flushed()
}
pub(crate) fn set_building_snapshot(&mut self, building: bool) {
self.building_snapshot = building;
}
pub(crate) fn building_snapshot(&self) -> bool {
self.building_snapshot
}
pub(crate) fn update_purged(&mut self, log_id: Option<LogIdOf<C>>) {
self.purged = log_id;
}
pub(crate) fn purged(&self) -> Option<&LogIdOf<C>> {
self.purged.as_ref()
}
pub(crate) fn calculate_local_committed(&mut self) -> Option<LogIdOf<C>> {
let local_committed = self.do_calculate_local_committed();
tracing::debug!(
"{}, cluster_committed: {}, accepted: {}, local_committed: {}",
func_name!(),
self.cluster_committed.value().display(),
self.log_progress.accepted().display(),
local_committed.display()
);
local_committed
}
pub(crate) fn do_calculate_local_committed(&mut self) -> Option<LogIdOf<C>> {
let cluster_committed = self.cluster_committed.value()?.clone();
let accepted = self.log_progress.accepted()?.clone();
let cluster_committed_vote = cluster_committed.to_committed_vote();
let accepted_vote = accepted.to_committed_vote()?;
if accepted_vote >= cluster_committed_vote {
std::cmp::min(
accepted.last_log_id().cloned(),
cluster_committed.last_log_id().cloned(),
)
} else {
None
}
}
}
fn new_progress<T>(initial_value: Option<T>, id: impl ToString, name: &'static str) -> Valid<IOProgress<T>>
where T: PartialOrd + fmt::Debug + fmt::Display + Clone {
Valid::new(IOProgress::new_synchronized(initial_value, id, name))
}