openraft/engine/handler/leader_handler/
mod.rs

1use crate::engine::handler::replication_handler::ReplicationHandler;
2use crate::engine::handler::replication_handler::SendNone;
3use crate::engine::Command;
4use crate::engine::EngineConfig;
5use crate::engine::EngineOutput;
6use crate::entry::RaftPayload;
7use crate::proposer::Leader;
8use crate::proposer::LeaderQuorumSet;
9use crate::raft_state::LogStateReader;
10use crate::type_config::alias::LogIdOf;
11use crate::AsyncRuntime;
12use crate::RaftLogId;
13use crate::RaftState;
14use crate::RaftTypeConfig;
15
16#[cfg(test)]
17mod append_entries_test;
18#[cfg(test)]
19mod send_heartbeat_test;
20
21/// Handle leader operations.
22///
23/// - Append new logs;
24/// - Change membership;
25/// - etc
26pub(crate) struct LeaderHandler<'x, C>
27where C: RaftTypeConfig
28{
29    pub(crate) config: &'x mut EngineConfig<C::NodeId>,
30    pub(crate) leader: &'x mut Leader<C, LeaderQuorumSet<C::NodeId>>,
31    pub(crate) state: &'x mut RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>,
32    pub(crate) output: &'x mut EngineOutput<C>,
33}
34
35impl<C> LeaderHandler<'_, C>
36where C: RaftTypeConfig
37{
38    /// Append new log entries by a leader.
39    ///
40    /// Also Update effective membership if the payload contains
41    /// membership config.
42    ///
43    /// If there is a membership config log entry, the caller has to guarantee the previous one is
44    /// committed.
45    ///
46    /// TODO(xp): if vote indicates this node is not the leader, refuse append
47    #[tracing::instrument(level = "debug", skip(self, entries))]
48    pub(crate) fn leader_append_entries(&mut self, mut entries: Vec<C::Entry>) {
49        let l = entries.len();
50        if l == 0 {
51            return;
52        }
53
54        self.leader.assign_log_ids(&mut entries);
55
56        self.state.extend_log_ids_from_same_leader(&entries);
57
58        let mut membership_entry = None;
59        for entry in entries.iter() {
60            if let Some(m) = entry.get_membership() {
61                debug_assert!(
62                    membership_entry.is_none(),
63                    "only one membership entry is allowed in a batch"
64                );
65                membership_entry = Some((entry.get_log_id().clone(), m.clone()));
66            }
67        }
68
69        // TODO: In future implementations with asynchronous IO,
70        //       ensure logs are not written until the vote is committed
71        //       to maintain consistency.
72        //       ---
73        //       Currently, IO requests to `RaftLogStorage` are executed
74        //       within the `RaftCore` task. This means an `AppendLog` request
75        //       won't be submitted to `RaftLogStorage` until `save_vote()` completes,
76        //       which ensures consistency.
77        //       ---
78        //       However, when `RaftLogStorage` is moved to a separate task,
79        //       `RaftCore` will communicate with `RaftLogStorage` via a channel.
80        //       This change could result in `AppendLog` requests being submitted
81        //       before the previous `save_vote()` request is finished.
82        //       ---
83        //       This scenario creates a risk where a log entry becomes visible and
84        //       is replicated by `ReplicationCore` to other nodes before the vote
85        //       is flushed to disk. If the vote isn't flushed and the server restarts,
86        //       the vote could revert to a previous state. This could allow a new leader
87        //       to be elected with a smaller vote (term), breaking consistency.
88        self.output.push_command(Command::AppendInputEntries {
89            // A leader should always use the leader's vote.
90            // It is allowed to be different from local vote.
91            vote: self.leader.vote.clone(),
92            entries,
93        });
94
95        let mut rh = self.replication_handler();
96
97        // Since this entry, the condition to commit has been changed.
98        // But we only need to commit in the new membership config.
99        // Because any quorum in the new one intersect with one in the previous membership config.
100        if let Some((log_id, m)) = membership_entry {
101            rh.append_membership(&log_id, &m);
102        }
103
104        rh.initiate_replication(SendNone::False);
105    }
106
107    #[tracing::instrument(level = "debug", skip_all)]
108    pub(crate) fn send_heartbeat(&mut self) {
109        let mut rh = self.replication_handler();
110        rh.initiate_replication(SendNone::True);
111    }
112
113    /// Get the log id for a linearizable read.
114    ///
115    /// See: [Read Operation](crate::docs::protocol::read)
116    pub(crate) fn get_read_log_id(&self) -> Option<LogIdOf<C>> {
117        let committed = self.state.committed().cloned();
118        // noop log id is the first log this leader proposed.
119        std::cmp::max(self.leader.noop_log_id.clone(), committed)
120    }
121
122    pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
123        ReplicationHandler {
124            config: self.config,
125            leader: self.leader,
126            state: self.state,
127            output: self.output,
128        }
129    }
130}