use display_more::DisplayOptionExt;
use display_more::DisplayResultExt;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::ServerState;
use crate::display_ext::DisplayInstantExt;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::engine::TargetProgress;
use crate::engine::handler::log_handler::LogHandler;
use crate::errors::NodeNotFound;
use crate::errors::Operation;
use crate::progress;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::progress::entry::ProgressEntry;
use crate::progress::inflight_id::InflightId;
use crate::progress::stream_id::StreamId;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::replication::replicate::Replicate;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::CommittedVoteOf;
use crate::type_config::alias::EffectiveMembershipOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::vote::raft_vote::RaftVoteExt;
#[cfg(test)]
mod append_membership_test;
#[cfg(test)]
mod update_matching_test;
pub(crate) struct ReplicationHandler<'x, C, SM = ()>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C>,
pub(crate) leader: &'x mut Leader<C, LeaderQuorumSet<C>>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C, SM>,
}
impl<C, SM> ReplicationHandler<'_, C, SM>
where C: RaftTypeConfig
{
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn append_membership(&mut self, log_id: &LogIdOf<C>, m: &Membership<C::NodeId, C::Node>) {
tracing::debug!("update effective membership: log_id:{} {}", log_id, m);
debug_assert!(
self.state.server_state == ServerState::Leader,
"Only leader is allowed to call update_effective_membership()"
);
debug_assert!(
self.state.is_leader(&self.config.id),
"Only leader is allowed to call update_effective_membership()"
);
self.state
.membership_state
.append(EffectiveMembershipOf::<C>::new_arc(Some(log_id.clone()), m.clone()));
self.rebuild_progresses();
self.rebuild_replication_streams(false);
self.initiate_replication();
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn rebuild_progresses(&mut self) {
let em = self.state.membership_state.effective();
let learner_ids = em.learner_ids().collect::<Vec<_>>();
{
let end = self.state.last_log_id().next_index();
let old_progress = self.leader.progress.clone();
let id_gen = self.state.progress_id_gen.clone();
let default_v = || {
let progress_id = StreamId::new(id_gen.next_id());
ProgressEntry::empty(progress_id, end)
};
self.leader.progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids.clone(), default_v);
}
{
let old_progress = self.leader.clock_progress.clone();
self.leader.clock_progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids, || None);
}
}
pub(crate) fn try_update_leader_clock(
&mut self,
stream_id: StreamId,
target: C::NodeId,
sending_time: InstantOf<C>,
) {
tracing::debug!("{}: target: {}, t: {}", func_name!(), &target, sending_time.display());
if !self.leader.is_replication_stream_valid(&target, stream_id) {
return;
}
let granted = *self
.leader
.clock_progress
.increase_to(&target, Some(sending_time))
.expect("it should always update existing progress");
tracing::debug!(
"granted leader vote clock after updating: granted: {}; clock_progress: {}",
granted.as_ref().map(|x| x.display()).display(),
self.leader
.clock_progress
.display_with(|f, id, v| { write!(f, "{}: {}", id, v.as_ref().map(|x| x.display()).display()) })
);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_matching(
&mut self,
node_id: C::NodeId,
log_id: Option<LogIdOf<C>>,
inflight_id: Option<InflightId>,
) {
tracing::debug!("{}: node_id: {}, log_id: {}", func_name!(), node_id, log_id.display());
debug_assert!(log_id.is_some(), "a valid update can never set matching to None");
let Ok(quorum_accepted) = self.leader.progress.update_with(&node_id, |prog_entry| {
prog_entry.new_updater(&*self.config).update_matching(log_id, inflight_id)
}) else {
return;
};
let quorum_accepted = quorum_accepted.clone();
tracing::debug!(
"after updating progress: quorum_accepted: {}",
quorum_accepted.display()
);
self.try_commit_quorum_accepted(quorum_accepted);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option<LogIdOf<C>>) {
if let Some(ref c) = granted
&& !self.state.vote_ref().is_same_leader(c.committed_leader_id())
{
return;
}
let committed = LogIOId::new(self.state.vote_ref().to_committed(), granted.clone());
self.state.io_state_mut().cluster_committed.try_update(committed.clone()).ok();
if let Some(_prev_committed) = self.state.update_local_committed(&granted) {
self.output.push_command(Command::ReplicateCommitted {
committed: self.state.committed().cloned(),
});
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_conflicting(
&mut self,
target: C::NodeId,
conflict: LogIdOf<C>,
inflight_id: Option<InflightId>,
) {
let Some(prog_entry) = self.leader.progress.get_mut(&target) else {
return;
};
let mut updater = progress::entry::update::Updater::new(self.config, prog_entry);
updater.update_conflicting(conflict.index(), inflight_id);
}
pub(crate) fn allow_next_revert(&mut self, target: C::NodeId, allow: bool) -> Result<(), NodeNotFound<C::NodeId>> {
let Some(prog_entry) = self.leader.progress.get_mut(&target) else {
tracing::warn!(
"target node {} not found in progress tracker, when {}",
target,
func_name!()
);
return Err(NodeNotFound::new(target, Operation::AllowNextRevert));
};
prog_entry.allow_log_reversion = allow;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_progress(
&mut self,
target: C::NodeId,
repl_res: Result<ReplicationResult<C>, String>,
inflight_id: Option<InflightId>,
) {
tracing::debug!(
"{}: target: {}, result: {}, inflight_id: {}, current progresses: {}",
func_name!(),
target,
repl_res.display(),
inflight_id.display(),
self.leader.progress
);
match repl_res {
Ok(p) => match p.0 {
Ok(matching) => {
self.update_matching(target, matching, inflight_id);
}
Err(conflict) => {
self.update_conflicting(target, conflict, inflight_id);
}
},
Err(err_str) => {
tracing::warn!("update progress error: {}", err_str);
if let Some(p) = self.leader.progress.get_mut(&target) {
p.inflight = Inflight::None;
};
}
};
self.try_purge_log();
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn rebuild_replication_streams(&mut self, close_old: bool) {
let mut targets = vec![];
let membership = self.state.membership_state.effective();
for item in self.leader.progress.iter_mut() {
if item.id != self.config.id {
if close_old {
item.val.inflight = Inflight::None;
}
let target_node = membership.get_node(&item.id).unwrap().clone();
targets.push(TargetProgress {
target: item.id.clone(),
target_node,
progress: item.val.clone(),
});
}
}
self.output.push_command(Command::RebuildReplicationStreams {
leader_vote: self.leader.committed_vote.clone(),
targets,
close_old_streams: close_old,
});
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn initiate_replication(&mut self) {
tracing::debug!("{}: progress: {:?}", func_name!(), self.leader.progress);
for item in self.leader.progress.iter_mut() {
if item.id == self.config.id {
continue;
}
let t = item.val.next_send(self.state, self.config.max_payload_entries);
tracing::debug!("next send: target: {}, send: {:?}", item.id, t);
match t {
Ok(inflight) => {
let leader_vote = self.leader.committed_vote.clone();
Self::send_to_target(self.output, leader_vote, &item.id, inflight);
}
Err(e) => {
tracing::debug!("no data to replicate for node-{}: current inflight: {:?}", item.id, e,);
}
}
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn send_to_target(
output: &mut EngineOutput<C, SM>,
leader_vote: CommittedVoteOf<C>,
target: &C::NodeId,
inflight: &Inflight<C>,
) {
match inflight {
Inflight::None => unreachable!("no data to send"),
Inflight::Logs {
log_id_range,
inflight_id,
} => {
let req = Replicate::new_logs(log_id_range.clone(), *inflight_id);
output.push_command(Command::Replicate {
target: target.clone(),
req,
});
}
Inflight::Snapshot { inflight_id } => {
output.push_command(Command::ReplicateSnapshot {
leader_vote,
target: target.clone(),
inflight_id: *inflight_id,
});
}
Inflight::LogsSince { prev, inflight_id } => {
let req = Replicate::new_logs_since(prev.clone(), *inflight_id);
output.push_command(Command::Replicate {
target: target.clone(),
req,
});
}
};
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_purge_log(&mut self) {
tracing::debug!(
"try_purge_log: last_purged_log_id: {}, purge_upto: {}",
self.state.last_purged_log_id().display(),
self.state.purge_upto().display()
);
if self.state.purge_upto() <= self.state.last_purged_log_id() {
tracing::debug!("no need to purge, return");
return;
}
let purge_upto = self.state.purge_upto().unwrap().clone();
let mut in_use = false;
for item in self.leader.progress.iter() {
if item.val.is_log_range_inflight(&purge_upto) {
tracing::debug!("log {} is in use by {}", purge_upto, item.id);
in_use = true;
}
}
if in_use {
tracing::debug!("cannot purge: {} is in use", purge_upto);
return;
}
self.log_handler().purge_log();
}
pub(crate) fn update_local_progress(&mut self, upto: Option<LogIdOf<C>>) {
tracing::debug!("{}: upto: {}", func_name!(), upto.display());
if upto.is_none() {
return;
}
let id = self.config.id.clone();
if let Some(prog_entry) = self.leader.progress.get_mut(&id) {
tracing::debug!("update progress: self_matching: {}", prog_entry.matching().display());
if prog_entry.matching() >= upto.as_ref() {
return;
}
prog_entry.inflight = Inflight::logs(None, upto.clone(), InflightId::new(0));
self.update_matching(id, upto, Some(InflightId::new(0)));
}
}
pub(crate) fn log_handler(&mut self) -> LogHandler<'_, C, SM> {
LogHandler {
config: self.config,
state: self.state,
output: self.output,
}
}
}