use std::sync::Arc;
use display_more::DisplayOptionExt;
use display_more::DisplaySliceExt;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::batch::Batch;
use crate::core::sm;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::entry::raft_entry_ext::RaftEntryExt;
use crate::errors::ConflictingLogId;
use crate::errors::RejectAppendEntries;
use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::type_config::alias::CommittedVoteOf;
use crate::type_config::alias::EffectiveMembershipOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::StoredMembershipOf;
use crate::vote::raft_vote::RaftVoteExt;
#[cfg(test)]
mod append_entries_test;
#[cfg(test)]
mod do_append_entries_test;
#[cfg(test)]
mod install_snapshot_test;
#[cfg(test)]
mod truncate_logs_test;
#[cfg(test)]
mod update_committed_membership_test;
pub(crate) struct FollowingHandler<'x, C, SM = ()>
where C: RaftTypeConfig
{
pub(crate) leader_vote: CommittedVoteOf<C>,
pub(crate) config: &'x mut EngineConfig<C>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C, SM>,
}
impl<C, SM> FollowingHandler<'_, C, SM>
where C: RaftTypeConfig
{
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn append_entries(&mut self, prev_log_id: Option<LogIdOf<C>>, mut entries: Vec<C::Entry>) {
tracing::debug!(
"{}: local last_log_id: {}, request: prev_log_id: {}, entries: {}",
func_name!(),
self.state.last_log_id().display(),
prev_log_id.display(),
entries.display(),
);
if let Some(first_ent) = entries.first() {
debug_assert!(first_ent.index() == prev_log_id.next_index());
}
let last_log_id = entries.last().map(|ent| ent.log_id());
let last_log_id = std::cmp::max(prev_log_id, last_log_id);
let prev_accepted = self.state.accept_log_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
let l = entries.len();
let since = self.state.first_conflicting_index(&entries);
if since < l {
self.truncate_logs(entries[since].index());
let entries = entries.split_off(since);
self.do_append_entries(entries);
} else {
let to_submit = IOId::new_log_io(self.leader_vote.clone(), last_log_id);
if Some(&to_submit) <= prev_accepted.as_ref() {
return;
}
let condition = prev_accepted.map(|x| Condition::IOFlushed { io_id: x });
self.output.push_command(Command::UpdateIOProgress {
when: condition,
io_id: to_submit,
});
}
}
pub(crate) fn ensure_log_consecutive(
&mut self,
prev_log_id: Option<&LogIdOf<C>>,
) -> Result<(), RejectAppendEntries<C>> {
if let Some(prev) = prev_log_id
&& !self.state.has_log_id(prev)
{
let local = self.state.get_log_id(prev.index());
tracing::debug!("prev_log_id mismatch, local: {}", local.display());
self.truncate_logs(prev.index());
return Err(RejectAppendEntries::ConflictingLogId(ConflictingLogId {
local,
expect: prev.clone(),
}));
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn do_append_entries(&mut self, entries: Vec<C::Entry>) {
debug_assert!(!entries.is_empty());
debug_assert_eq!(entries[0].index(), self.state.log_ids.last().next_index(),);
debug_assert!(Some(entries[0].ref_log_id()) > self.state.log_ids.last_ref());
self.state.extend_log_ids(entries.iter().map(|ent| ent.ref_log_id()));
self.append_membership(entries.iter());
self.output.push_command(Command::AppendEntries {
committed_vote: self.leader_vote.clone(),
entries: Batch::of(entries),
});
}
#[tracing::instrument(level = "debug", skip(self))]
fn truncate_logs(&mut self, since: u64) {
tracing::debug!("truncate logs since index {}", since);
debug_assert!(since >= self.state.last_purged_log_id().next_index());
if self.state.get_log_id(since).is_none() {
tracing::debug!("skip truncating absent log at index {}", since);
return;
}
let after = self.state.prev_log_id(since);
self.state.log_ids.truncate(since);
self.output.push_command(Command::TruncateLog { after });
let changed = self.state.membership_state.truncate(since);
if let Some(_c) = changed {
self.server_state_handler().update_server_state_if_changed();
}
}
fn append_membership<'a>(&mut self, entries: impl DoubleEndedIterator<Item = &'a C::Entry>)
where C::Entry: 'a {
let memberships = Self::last_two_memberships(entries);
if memberships.is_empty() {
return;
}
for (i, m) in memberships.into_iter().enumerate() {
tracing::debug!("apply membership config #{} from leader: {}", i, m);
self.state
.membership_state
.append(Arc::new(EffectiveMembershipOf::<C>::new_from_stored_membership(m)));
}
tracing::debug!("membership state updated: {}", self.state.membership_state);
self.server_state_handler().update_server_state_if_changed();
}
#[tracing::instrument(level = "debug", skip_all)]
fn update_committed_membership(&mut self, membership: EffectiveMembershipOf<C>) {
tracing::debug!("update committed membership: {}", membership);
let m = Arc::new(membership);
let _effective_changed = self.state.membership_state.update_committed(m);
self.server_state_handler().update_server_state_if_changed();
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_full_snapshot(&mut self, snapshot: SnapshotOf<C>) -> Option<Condition<C>> {
let meta = &snapshot.meta;
tracing::info!("install full snapshot, meta: {:?}", meta);
let snap_last_log_id = meta.last_log_id.clone();
if snap_last_log_id.as_ref() <= self.state.committed() {
tracing::info!(
"No need to install snapshot; snapshot last_log_id({}) <= committed({})",
snap_last_log_id.display(),
self.state.committed().display()
);
return None;
}
let snap_last_log_id = snap_last_log_id.unwrap();
let mut snap_handler = self.snapshot_handler();
let updated = snap_handler.update_snapshot(meta.clone());
if !updated {
return None;
}
let local = self.state.get_log_id(snap_last_log_id.index());
if let Some(local) = local
&& local != snap_last_log_id
{
self.truncate_logs(self.state.committed().next_index());
}
let log_io_id = LogIOId::new(self.leader_vote.to_committed(), Some(snap_last_log_id.clone()));
self.state.accept_log_io(log_io_id.to_io_id());
self.state.apply_progress_mut().accept(snap_last_log_id.clone());
self.state.snapshot_progress_mut().accept(snap_last_log_id.clone());
self.update_committed_membership(EffectiveMembershipOf::<C>::new_from_stored_membership(
meta.last_membership.clone(),
));
self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot, log_io_id)));
self.state.purge_upto = Some(snap_last_log_id.clone());
self.log_handler().purge_log();
Some(Condition::Snapshot {
log_id: snap_last_log_id,
})
}
fn last_two_memberships<'a>(entries: impl DoubleEndedIterator<Item = &'a C::Entry>) -> Vec<StoredMembershipOf<C>>
where C::Entry: 'a {
let mut memberships = vec![];
for ent in entries.rev() {
if let Some(m) = ent.get_membership() {
memberships.insert(0, StoredMembershipOf::<C>::new(Some(ent.log_id()), m));
if memberships.len() == 2 {
break;
}
}
}
memberships
}
fn log_handler(&mut self) -> LogHandler<'_, C, SM> {
LogHandler {
config: self.config,
state: self.state,
output: self.output,
}
}
fn snapshot_handler(&mut self) -> SnapshotHandler<'_, '_, C, SM> {
SnapshotHandler {
state: self.state,
output: self.output,
}
}
fn server_state_handler(&mut self) -> ServerStateHandler<'_, C> {
ServerStateHandler {
config: self.config,
state: self.state,
}
}
}