1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::replication_handler::SendNone;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
use crate::AsyncRuntime;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
#[cfg(test)]
mod append_entries_test;
#[cfg(test)]
mod send_heartbeat_test;
/// Handle leader operations.
///
/// - Append new logs;
/// - Change membership;
/// - etc
pub(crate) struct LeaderHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader: &'x mut Leader<C, LeaderQuorumSet<C::NodeId>>,
pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
pub(crate) output: &'x mut EngineOutput<C>,
}
impl<'x, C> LeaderHandler<'x, C>
where C: RaftTypeConfig
{
/// Append new log entries by a leader.
///
/// Also Update effective membership if the payload contains
/// membership config.
///
/// If there is a membership config log entry, the caller has to guarantee the previous one is
/// committed.
///
/// TODO(xp): if vote indicates this node is not the leader, refuse append
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn leader_append_entries(&mut self, mut entries: Vec<C::Entry>) {
let l = entries.len();
if l == 0 {
return;
}
self.leader.assign_log_ids(&mut entries);
self.state.extend_log_ids_from_same_leader(&entries);
let mut membership_entry = None;
for entry in entries.iter() {
if let Some(m) = entry.get_membership() {
debug_assert!(
membership_entry.is_none(),
"only one membership entry is allowed in a batch"
);
membership_entry = Some((*entry.get_log_id(), m.clone()));
}
}
// TODO: In future implementations with asynchronous IO,
// ensure logs are not written until the vote is committed
// to maintain consistency.
// ---
// Currently, IO requests to `RaftLogStorage` are executed
// within the `RaftCore` task. This means an `AppendLog` request
// won't be submitted to `RaftLogStorage` until `save_vote()` completes,
// which ensures consistency.
// ---
// However, when `RaftLogStorage` is moved to a separate task,
// `RaftCore` will communicate with `RaftLogStorage` via a channel.
// This change could result in `AppendLog` requests being submitted
// before the previous `save_vote()` request is finished.
// ---
// This scenario creates a risk where a log entry becomes visible and
// is replicated by `ReplicationCore` to other nodes before the vote
// is flushed to disk. If the vote isn't flushed and the server restarts,
// the vote could revert to a previous state. This could allow a new leader
// to be elected with a smaller vote (term), breaking consistency.
self.output.push_command(Command::AppendInputEntries {
// A leader should always use the leader's vote.
// It is allowed to be different from local vote.
vote: self.leader.vote,
entries,
});
let mut rh = self.replication_handler();
// Since this entry, the condition to commit has been changed.
// But we only need to commit in the new membership config.
// Because any quorum in the new one intersect with one in the previous membership config.
if let Some((log_id, m)) = membership_entry {
rh.append_membership(&log_id, &m);
}
rh.initiate_replication(SendNone::False);
}
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn send_heartbeat(&mut self) -> () {
let mut rh = self.replication_handler();
rh.initiate_replication(SendNone::True);
}
/// Get the log id for a linearizable read.
///
/// See: [Read Operation](crate::docs::protocol::read)
pub(crate) fn get_read_log_id(&self) -> Option<LogIdOf<C>> {
let committed = self.state.committed().copied();
// noop log id is the first log this leader proposed.
std::cmp::max(self.leader.noop_log_id, committed)
}
pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
ReplicationHandler {
config: self.config,
leader: self.leader,
state: self.state,
output: self.output,
}
}
}