use display_more::DisplayOptionExt;
use crate::LogIdOptionExt;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::log_id::option_ref_log_id_ext::OptionRefLogIdExt;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
#[cfg(test)]
mod calc_purge_upto_test;
#[cfg(test)]
mod purge_log_test;
pub(crate) struct LogHandler<'x, C, SM = ()>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C, SM>,
}
impl<C, SM> LogHandler<'_, C, SM>
where C: RaftTypeConfig
{
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn purge_log(&mut self) {
let st = &mut self.state;
let purge_upto = st.purge_upto();
tracing::info!(
"purge log, last_purged: {}, purge_upto: {}",
st.last_purged_log_id().display(),
purge_upto.display()
);
if purge_upto <= st.last_purged_log_id() {
return;
}
let upto = purge_upto.unwrap().clone();
st.purge_log(&upto);
self.output.push_command(Command::PurgeLog { upto });
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn schedule_policy_based_purge(&mut self) {
if let Some(purge_upto) = self.calc_purge_upto() {
self.update_purge_upto(purge_upto);
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_purge_upto(&mut self, purge_upto: LogIdOf<C>) {
debug_assert!(self.state.purge_upto() <= Some(&purge_upto));
self.state.purge_upto = Some(purge_upto);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn calc_purge_upto(&self) -> Option<LogIdOf<C>> {
let st = &self.state;
let max_keep = self.config.max_in_snapshot_log_to_keep;
let batch_size = self.config.purge_batch_size;
let purge_end = self.state.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep);
tracing::debug!(
"calculate purge range: up to index {}, snapshot_last_log_id: {:?}, max_keep: {}",
purge_end,
self.state.snapshot_meta.last_log_id,
max_keep
);
if st.last_purged_log_id().next_index() + batch_size > purge_end {
tracing::debug!(
"skip purge: batch not full, snapshot_last_log_id: {:?}, max_keep: {}, last_purged: {}, batch_size: {}, purge_end: {}",
self.state.snapshot_meta.last_log_id,
max_keep,
st.last_purged_log_id().display(),
batch_size,
purge_end
);
return None;
}
let log_id = self.state.log_ids.ref_at(purge_end - 1);
debug_assert!(
log_id.is_some(),
"log id not found at {}, engine.state:{:?}",
purge_end - 1,
st
);
log_id.to_log_id()
}
}