use display_more::DisplayOptionExt;
use crate::RaftTypeConfig;
use crate::Vote;
use crate::async_runtime::watch::WatchSender;
use crate::core::io_flush_tracking::FlushPoint;
use crate::raft_state::IOId;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::WatchSenderOf;
pub(crate) struct IoProgressSender<C>
where C: RaftTypeConfig
{
pub(crate) log_tx: WatchSenderOf<C, Option<FlushPoint<C>>>,
pub(crate) vote_tx: WatchSenderOf<C, Option<Vote<C::LeaderId>>>,
pub(crate) commit_tx: WatchSenderOf<C, Option<LogIdOf<C>>>,
pub(crate) snapshot_tx: WatchSenderOf<C, Option<LogIdOf<C>>>,
pub(crate) apply_tx: WatchSenderOf<C, Option<LogIdOf<C>>>,
}
impl<C> IoProgressSender<C>
where C: RaftTypeConfig
{
pub(crate) fn send_log_progress(&self, io_id: Option<IOId<C>>) -> Option<()> {
tracing::debug!("send_log_progress: try to update to :{}", io_id.display());
let (vote, log_id) = io_id?.to_vote_and_log_id();
{
let vote = vote.clone();
self.vote_tx.send_if_modified(move |prev| {
if prev.as_ref() != Some(&vote) {
tracing::debug!("send_log_progress: update vote to :{}", vote);
*prev = Some(vote);
true
} else {
false
}
});
}
self.log_tx.send_if_modified(move |prev| {
let x = Some(FlushPoint::new(vote, log_id));
if prev.as_ref() != x.as_ref() {
tracing::debug!("send_log_progress: update log to :{}", x.display());
*prev = x;
true
} else {
false
}
});
Some(())
}
pub(crate) fn send_commit_progress(&self, log_id: Option<LogIdOf<C>>) -> Option<()> {
tracing::debug!("send_commit_progress: try to update to :{}", log_id.display());
self.commit_tx.send_if_modified(move |prev| {
if prev.as_ref() != log_id.as_ref() {
tracing::debug!("send_commit_progress: update commit to :{}", log_id.display());
*prev = log_id.clone();
true
} else {
false
}
});
Some(())
}
pub(crate) fn send_snapshot_progress(&self, log_id: Option<LogIdOf<C>>) -> Option<()> {
tracing::debug!("send_snapshot_progress: try to update to :{}", log_id.display());
self.snapshot_tx.send_if_modified(move |prev| {
if prev.as_ref() != log_id.as_ref() {
tracing::debug!("send_snapshot_progress: update snapshot to :{}", log_id.display());
*prev = log_id.clone();
true
} else {
false
}
});
Some(())
}
pub(crate) fn send_apply_progress(&self, log_id: Option<LogIdOf<C>>) -> Option<()> {
tracing::debug!("send_apply_progress: try to update to :{}", log_id.display());
self.apply_tx.send_if_modified(move |prev| {
if prev.as_ref() != log_id.as_ref() {
tracing::debug!("send_apply_progress: update applied to :{}", log_id.display());
*prev = log_id.clone();
true
} else {
false
}
});
Some(())
}
}