pub(crate) mod update;
use std::borrow::Borrow;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use display_more::DisplayOptionExt;
use validit::Validate;
use crate::LogIdOptionExt;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::engine::EngineConfig;
use crate::progress::entry::update::Updater;
use crate::progress::inflight::Inflight;
use crate::progress::stream_id::StreamId;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct ProgressEntry<C>
where C: RaftTypeConfig
{
pub(crate) stream_id: StreamId,
pub(crate) matching: Option<LogIdOf<C>>,
pub(crate) inflight: Inflight<C>,
pub(crate) searching_end: u64,
pub(crate) allow_log_reversion: bool,
}
impl<C> ProgressEntry<C>
where C: RaftTypeConfig
{
#[allow(dead_code)]
pub(crate) fn testing_new(matching: Option<LogIdOf<C>>) -> Self {
Self {
stream_id: StreamId::new(0),
matching: matching.clone(),
inflight: Inflight::None,
searching_end: matching.next_index(),
allow_log_reversion: false,
}
}
pub(crate) fn empty(stream_id: StreamId, end: u64) -> Self {
Self {
stream_id,
matching: None,
inflight: Inflight::None,
searching_end: end,
allow_log_reversion: false,
}
}
#[allow(dead_code)]
pub(crate) fn with_inflight(mut self, inflight: Inflight<C>) -> Self {
debug_assert_eq!(self.inflight, Inflight::None);
self.inflight = inflight;
self
}
pub(crate) fn new_updater<'a>(&'a mut self, engine_config: &'a EngineConfig<C>) -> Updater<'a, C> {
Updater::new(engine_config, self)
}
pub(crate) fn matching(&self) -> Option<&LogIdOf<C>> {
self.matching.as_ref()
}
pub(crate) fn is_log_range_inflight(&self, upto: &LogIdOf<C>) -> bool {
match &self.inflight {
Inflight::None => false,
Inflight::Logs { log_id_range, .. } => {
let lid = Some(upto);
lid > log_id_range.prev.as_ref()
}
Inflight::Snapshot { inflight_id: _ } => false,
Inflight::LogsSince { prev, .. } => {
let lid = Some(upto);
lid > prev.as_ref()
}
}
}
pub(crate) fn next_send(
&mut self,
log_state: &mut RaftState<C>,
max_entries: u64,
) -> Result<&Inflight<C>, &Inflight<C>> {
if !self.inflight.is_none() {
return Err(&self.inflight);
}
let last_next = log_state.last_log_id().next_index();
debug_assert!(
self.searching_end <= last_next,
"expect: searching_end: {} <= last_log_id.next_index: {}",
self.searching_end,
last_next
);
let purge_upto_next = {
let purge_upto = log_state.purge_upto();
purge_upto.next_index()
};
if self.searching_end < purge_upto_next {
let inflight_id = log_state.new_inflight_id();
self.inflight = Inflight::snapshot(inflight_id);
return Ok(&self.inflight);
}
let matching_next = self.matching().next_index();
let mut start = Self::calc_mid(matching_next, self.searching_end);
if start < purge_upto_next {
start = purge_upto_next;
}
if start == matching_next && matching_next == self.searching_end {
let prev = log_state.prev_log_id(start);
let inflight_id = log_state.new_inflight_id();
self.inflight = Inflight::LogsSince { prev, inflight_id };
return Ok(&self.inflight);
}
let end = std::cmp::min(start + max_entries, last_next);
if start == end {
self.inflight = Inflight::None;
return Err(&self.inflight);
}
let prev = log_state.prev_log_id(start);
let last = log_state.prev_log_id(end);
let inflight_id = log_state.new_inflight_id();
self.inflight = Inflight::logs(prev, last, inflight_id);
Ok(&self.inflight)
}
#[allow(dead_code)]
pub(crate) fn sending_start(&self) -> (u64, u64) {
let mid = Self::calc_mid(self.matching().next_index(), self.searching_end);
(mid, self.searching_end)
}
fn calc_mid(matching_next: u64, end: u64) -> u64 {
debug_assert!(matching_next <= end);
let d = end - matching_next;
let offset = d / 16 * 8;
matching_next + offset
}
}
impl<C> Borrow<Option<LogIdOf<C>>> for ProgressEntry<C>
where C: RaftTypeConfig
{
fn borrow(&self) -> &Option<LogIdOf<C>> {
&self.matching
}
}
impl<C> Display for ProgressEntry<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{P({})[{}, {}), inflight:{}}}",
self.stream_id,
self.matching().display(),
self.searching_end,
self.inflight,
)
}
}
impl<C> Validate for ProgressEntry<C>
where C: RaftTypeConfig
{
fn validate(&self) -> Result<(), Box<dyn Error>> {
validit::less_equal!(self.matching().next_index(), self.searching_end);
self.inflight.validate()?;
match &self.inflight {
Inflight::None => {}
Inflight::Logs { log_id_range, .. } => {
validit::less_equal!(self.matching(), log_id_range.prev.as_ref());
validit::less_equal!(log_id_range.prev.next_index(), self.searching_end);
}
Inflight::Snapshot { inflight_id: _ } => {}
Inflight::LogsSince { .. } => {}
}
Ok(())
}
}
#[cfg(test)]
mod tests;