openraft/engine/handler/leader_handler/mod.rs
1use crate::engine::handler::replication_handler::ReplicationHandler;
2use crate::engine::handler::replication_handler::SendNone;
3use crate::engine::Command;
4use crate::engine::EngineConfig;
5use crate::engine::EngineOutput;
6use crate::entry::RaftPayload;
7use crate::proposer::Leader;
8use crate::proposer::LeaderQuorumSet;
9use crate::raft_state::LogStateReader;
10use crate::type_config::alias::LogIdOf;
11use crate::AsyncRuntime;
12use crate::RaftLogId;
13use crate::RaftState;
14use crate::RaftTypeConfig;
15
16#[cfg(test)]
17mod append_entries_test;
18#[cfg(test)]
19mod send_heartbeat_test;
20
21/// Handle leader operations.
22///
23/// - Append new logs;
24/// - Change membership;
25/// - etc
26pub(crate) struct LeaderHandler<'x, C>
27where C: RaftTypeConfig
28{
29 pub(crate) config: &'x mut EngineConfig<C::NodeId>,
30 pub(crate) leader: &'x mut Leader<C, LeaderQuorumSet<C::NodeId>>,
31 pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
32 pub(crate) output: &'x mut EngineOutput<C>,
33}
34
35impl<C> LeaderHandler<'_, C>
36where C: RaftTypeConfig
37{
38 /// Append new log entries by a leader.
39 ///
40 /// Also Update effective membership if the payload contains
41 /// membership config.
42 ///
43 /// If there is a membership config log entry, the caller has to guarantee the previous one is
44 /// committed.
45 ///
46 /// TODO(xp): if vote indicates this node is not the leader, refuse append
47 #[tracing::instrument(level = "debug", skip(self, entries))]
48 pub(crate) fn leader_append_entries(&mut self, mut entries: Vec<C::Entry>) {
49 let l = entries.len();
50 if l == 0 {
51 return;
52 }
53
54 self.leader.assign_log_ids(&mut entries);
55
56 self.state.extend_log_ids_from_same_leader(&entries);
57
58 let mut membership_entry = None;
59 for entry in entries.iter() {
60 if let Some(m) = entry.get_membership() {
61 debug_assert!(
62 membership_entry.is_none(),
63 "only one membership entry is allowed in a batch"
64 );
65 membership_entry = Some((entry.get_log_id().clone(), m.clone()));
66 }
67 }
68
69 // TODO: In future implementations with asynchronous IO,
70 // ensure logs are not written until the vote is committed
71 // to maintain consistency.
72 // ---
73 // Currently, IO requests to `RaftLogStorage` are executed
74 // within the `RaftCore` task. This means an `AppendLog` request
75 // won't be submitted to `RaftLogStorage` until `save_vote()` completes,
76 // which ensures consistency.
77 // ---
78 // However, when `RaftLogStorage` is moved to a separate task,
79 // `RaftCore` will communicate with `RaftLogStorage` via a channel.
80 // This change could result in `AppendLog` requests being submitted
81 // before the previous `save_vote()` request is finished.
82 // ---
83 // This scenario creates a risk where a log entry becomes visible and
84 // is replicated by `ReplicationCore` to other nodes before the vote
85 // is flushed to disk. If the vote isn't flushed and the server restarts,
86 // the vote could revert to a previous state. This could allow a new leader
87 // to be elected with a smaller vote (term), breaking consistency.
88 self.output.push_command(Command::AppendInputEntries {
89 // A leader should always use the leader's vote.
90 // It is allowed to be different from local vote.
91 vote: self.leader.vote.clone(),
92 entries,
93 });
94
95 let mut rh = self.replication_handler();
96
97 // Since this entry, the condition to commit has been changed.
98 // But we only need to commit in the new membership config.
99 // Because any quorum in the new one intersect with one in the previous membership config.
100 if let Some((log_id, m)) = membership_entry {
101 rh.append_membership(&log_id, &m);
102 }
103
104 rh.initiate_replication(SendNone::False);
105 }
106
107 #[tracing::instrument(level = "debug", skip_all)]
108 pub(crate) fn send_heartbeat(&mut self) {
109 let mut rh = self.replication_handler();
110 rh.initiate_replication(SendNone::True);
111 }
112
113 /// Get the log id for a linearizable read.
114 ///
115 /// See: [Read Operation](crate::docs::protocol::read)
116 pub(crate) fn get_read_log_id(&self) -> Option<LogIdOf<C>> {
117 let committed = self.state.committed().cloned();
118 // noop log id is the first log this leader proposed.
119 std::cmp::max(self.leader.noop_log_id.clone(), committed)
120 }
121
122 pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
123 ReplicationHandler {
124 config: self.config,
125 leader: self.leader,
126 state: self.state,
127 output: self.output,
128 }
129 }
130}