use display_more::DisplayOptionExt;
use crate::LogIdOptionExt;
use crate::RaftTypeConfig;
use crate::engine::EngineConfig;
use crate::progress::Inflight;
use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::type_config::alias::LogIdOf;
pub(crate) struct Updater<'a, C>
where C: RaftTypeConfig
{
engine_config: &'a EngineConfig<C>,
entry: &'a mut ProgressEntry<C>,
}
impl<'a, C> Updater<'a, C>
where C: RaftTypeConfig
{
pub(crate) fn new(engine_config: &'a EngineConfig<C>, entry: &'a mut ProgressEntry<C>) -> Self {
Self { engine_config, entry }
}
pub(crate) fn update_conflicting(&mut self, conflict: u64, inflight_id: Option<InflightId>) {
tracing::debug!(
"update_conflict: current progress_entry: {}; conflict: {}",
self.entry,
conflict
);
if let Some(inflight_id) = inflight_id {
self.entry.inflight.conflict(conflict, inflight_id);
}
if conflict >= self.entry.searching_end {
tracing::debug!(
"conflict {} >= searching_end {}; no need to update",
conflict,
self.entry.searching_end
);
return;
}
self.entry.searching_end = conflict;
let allow_reset = self.entry.allow_log_reversion || self.engine_config.allow_log_reversion;
if allow_reset {
if conflict < self.entry.matching().next_index() {
tracing::warn!(
"conflict {} < last matching {}: \
follower log is reverted; \
with 'allow_log_reversion' enabled, this is allowed.",
conflict,
self.entry.matching().display(),
);
self.entry.matching = None;
self.entry.allow_log_reversion = false;
if self.entry.inflight.is_logs_since() {
self.entry.inflight = Inflight::None;
}
}
} else {
debug_assert!(
conflict >= self.entry.matching().next_index(),
"follower log reversion is not allowed \
without `allow_log_reversion` enabled; \
matching: {}; conflict: {}",
self.entry.matching().display(),
conflict
);
}
}
pub(crate) fn update_matching(&mut self, matching: Option<LogIdOf<C>>, inflight_id: Option<InflightId>) {
tracing::debug!(
"update_matching: current progress_entry: {}; matching: {}",
self.entry,
matching.display()
);
if let Some(inflight_id) = inflight_id {
self.entry.inflight.ack(matching.clone(), inflight_id);
}
if inflight_id.is_none() && matching <= self.entry.matching {
return;
}
debug_assert!(matching.as_ref() >= self.entry.matching());
self.entry.matching = matching;
let matching_next = self.entry.matching().next_index();
self.entry.searching_end = std::cmp::max(self.entry.searching_end, matching_next);
}
}