use crate::{AppData, AppDataResponse, RaftNetwork, RaftStorage};
use crate::error::RaftResult;
use crate::raft::{AppendEntriesRequest, AppendEntriesResponse, ConflictOpt, Entry, EntryPayload};
use crate::core::{RaftCore, State, UpdateCurrentLeader};
impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
#[tracing::instrument(
level="trace", skip(self, msg),
fields(term=msg.term, leader_id=msg.leader_id, prev_log_index=msg.prev_log_index, prev_log_term=msg.prev_log_term, leader_commit=msg.leader_commit),
)]
pub(super) async fn handle_append_entries_request(&mut self, msg: AppendEntriesRequest<D>) -> RaftResult<AppendEntriesResponse> {
if &msg.term < &self.current_term {
tracing::trace!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term");
return Ok(AppendEntriesResponse{term: self.current_term, success: false, conflict_opt: None});
}
self.update_next_election_timeout();
let mut report_metrics = false;
self.commit_index = msg.leader_commit;
if &self.current_term != &msg.term {
self.update_current_term(msg.term, None);
self.save_hard_state().await?;
report_metrics = true;
}
if self.current_leader.as_ref() != Some(&msg.leader_id) {
self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id));
report_metrics = true;
}
if !self.target_state.is_follower() && !self.target_state.is_non_voter() {
self.set_target_state(State::Follower);
}
let msg_prev_index_is_min = &msg.prev_log_index == &u64::min_value();
let msg_index_and_term_match = (&msg.prev_log_index == &self.last_log_index) && (&msg.prev_log_term == &self.last_log_term);
if msg_prev_index_is_min || msg_index_and_term_match {
if msg.entries.len() == 0 {
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse{term: self.current_term, success: true, conflict_opt: None});
}
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse{term: self.current_term, success: true, conflict_opt: None});
}
tracing::trace!("begin log consistency check");
let entries = self.storage.get_log_entries(msg.prev_log_index, msg.prev_log_index).await.map_err(|err| self.map_fatal_storage_error(err))?;
let target_entry = match entries.first() {
Some(target_entry) => target_entry,
None => {
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse{
term: self.current_term, success: false,
conflict_opt: Some(ConflictOpt{term: self.last_log_term, index: self.last_log_index}),
});
}
};
if &target_entry.term == &msg.prev_log_term {
if &self.last_log_index > &target_entry.index {
self.storage.delete_logs_from(target_entry.index + 1, None).await.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
}
}
else {
let start = if &msg.prev_log_index >= &50 { &msg.prev_log_index - 50 } else { 0 };
let old_entries = self.storage.get_log_entries(start, msg.prev_log_index).await.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) {
Some(entry) => Some(ConflictOpt{term: entry.term, index: entry.index}),
None => Some(ConflictOpt{term: self.last_log_term, index: self.last_log_index}),
};
if report_metrics {
self.report_metrics();
}
return Ok(AppendEntriesResponse{term: self.current_term, success: false, conflict_opt: opt});
}
tracing::trace!("end log consistency check");
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(&mut report_metrics).await?;
if report_metrics {
self.report_metrics();
}
Ok(AppendEntriesResponse{term: self.current_term, success: true, conflict_opt: None})
}
#[tracing::instrument(level="trace", skip(self, entries))]
async fn append_log_entries(&mut self, entries: &[Entry<D>]) -> RaftResult<()> {
let last_conf_change = entries.iter()
.filter_map(|ent| match &ent.payload {
EntryPayload::ConfigChange(conf) => Some(conf),
_ => None,
})
.last();
if let Some(conf) = last_conf_change {
tracing::debug!({membership=?conf}, "applying new membership config received from leader");
self.update_membership(conf.membership.clone())?;
};
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log_index = entry.index;
self.last_log_term = entry.term;
}
Ok(())
}
#[tracing::instrument(level="trace", skip(self, report_metrics))]
async fn replicate_to_state_machine_if_needed(&mut self, report_metrics: &mut bool) -> RaftResult<()> {
if &self.commit_index > &self.last_applied {
let stop = std::cmp::min(self.commit_index, self.last_log_index) + 1;
let entries = self.storage.get_log_entries(self.last_applied + 1, stop).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_applied = entry.index;
*report_metrics = true;
}
let data_entries: Vec<_> = entries.iter()
.filter_map(|entry| match &entry.payload {
EntryPayload::Normal(inner) => Some((&entry.index, &inner.data)),
_ => None,
})
.collect();
if data_entries.is_empty() {
return Ok(());
}
self.storage.replicate_to_state_machine(&data_entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
self.trigger_log_compaction_if_needed();
}
Ok(())
}
}