nodedb_raft/node/
membership.rs1use tracing::info;
10
11use crate::state::NodeRole;
12use crate::storage::LogStorage;
13
14use super::core::RaftNode;
15
16impl<S: LogStorage> RaftNode<S> {
17 pub(super) fn set_voters(&mut self, new_voters: Vec<u64>) {
23 let last_index = self.log.last_index();
24
25 if let Some(ref mut leader) = self.leader_state {
26 for &peer in &new_voters {
27 if !self.config.peers.contains(&peer) && !self.config.learners.contains(&peer) {
28 leader.add_peer(peer, last_index);
29 info!(
30 node = self.config.node_id,
31 group = self.config.group_id,
32 peer,
33 "added voter to leader tracking"
34 );
35 }
36 }
37 for &peer in &self.config.peers {
38 if !new_voters.contains(&peer) && !self.config.learners.contains(&peer) {
39 leader.remove_peer(peer);
40 info!(
41 node = self.config.node_id,
42 group = self.config.group_id,
43 peer,
44 "removed voter from leader tracking"
45 );
46 }
47 }
48 }
49
50 self.config.peers = new_voters;
51 }
52
53 pub fn add_peer(&mut self, peer: u64) {
60 if peer == self.config.node_id
61 || self.config.peers.contains(&peer)
62 || self.config.learners.contains(&peer)
63 {
64 return;
65 }
66 let mut new_peers = self.config.peers.clone();
67 new_peers.push(peer);
68 self.set_voters(new_peers);
69 }
70
71 pub fn remove_peer(&mut self, peer: u64) {
73 if !self.config.peers.contains(&peer) {
74 return;
75 }
76 let new_peers: Vec<u64> = self
77 .config
78 .peers
79 .iter()
80 .copied()
81 .filter(|&id| id != peer)
82 .collect();
83 self.set_voters(new_peers);
84 }
85
86 pub fn add_learner(&mut self, peer: u64) {
95 if peer == self.config.node_id
96 || self.config.peers.contains(&peer)
97 || self.config.learners.contains(&peer)
98 {
99 return;
100 }
101
102 let last_index = self.log.last_index();
103 if let Some(ref mut leader) = self.leader_state {
104 leader.add_peer(peer, last_index);
105 }
106 self.config.learners.push(peer);
107
108 info!(
109 node = self.config.node_id,
110 group = self.config.group_id,
111 peer,
112 "added learner peer"
113 );
114 }
115
116 pub fn remove_learner(&mut self, peer: u64) {
118 if !self.config.learners.contains(&peer) {
119 return;
120 }
121 if let Some(ref mut leader) = self.leader_state {
122 leader.remove_peer(peer);
123 }
124 self.config.learners.retain(|&id| id != peer);
125
126 info!(
127 node = self.config.node_id,
128 group = self.config.group_id,
129 peer,
130 "removed learner peer"
131 );
132 }
133
134 pub fn promote_learner(&mut self, peer: u64) -> bool {
144 if !self.config.learners.contains(&peer) {
145 return false;
146 }
147 self.config.learners.retain(|&id| id != peer);
148 if !self.config.peers.contains(&peer) {
149 self.config.peers.push(peer);
150 }
151
152 info!(
153 node = self.config.node_id,
154 group = self.config.group_id,
155 peer,
156 "promoted learner to voter"
157 );
158 true
159 }
160
161 pub fn promote_self_to_voter(&mut self) {
167 if self.role == NodeRole::Learner {
168 self.role = NodeRole::Follower;
169 self.config.starts_as_learner = false;
170 info!(
172 node = self.config.node_id,
173 group = self.config.group_id,
174 "promoted self from learner to follower"
175 );
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use crate::node::config::RaftConfig;
183 use crate::node::core::RaftNode;
184 use crate::state::NodeRole;
185 use crate::storage::MemStorage;
186 use std::time::{Duration, Instant};
187
188 fn cfg(node_id: u64, peers: Vec<u64>) -> RaftConfig {
189 RaftConfig {
190 node_id,
191 group_id: 0,
192 peers,
193 learners: vec![],
194 starts_as_learner: false,
195 election_timeout_min: Duration::from_millis(150),
196 election_timeout_max: Duration::from_millis(300),
197 heartbeat_interval: Duration::from_millis(50),
198 }
199 }
200
201 fn force_leader(node: &mut RaftNode<MemStorage>) {
202 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
203 node.tick();
204 let _ = node.take_ready();
206 }
208
209 #[test]
210 fn add_learner_does_not_change_quorum() {
211 let mut node = RaftNode::new(cfg(1, vec![2, 3]), MemStorage::new());
213 assert_eq!(node.config.quorum(), 2);
214
215 node.add_learner(4);
216 assert_eq!(node.learners(), &[4]);
217 assert_eq!(node.config.quorum(), 2);
219 assert_eq!(node.config.cluster_size(), 3);
220 }
221
222 #[test]
223 fn promote_learner_grows_quorum() {
224 let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
225 assert_eq!(node.config.quorum(), 2); node.add_learner(3);
228 assert_eq!(node.config.quorum(), 2);
229
230 let promoted = node.promote_learner(3);
231 assert!(promoted);
232 assert_eq!(node.voters(), &[2, 3]);
233 assert!(node.learners().is_empty());
234 assert_eq!(node.config.cluster_size(), 3);
236 assert_eq!(node.config.quorum(), 2);
237 }
238
239 #[test]
240 fn remove_learner_drops_peer() {
241 let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
242 node.add_learner(3);
243 assert_eq!(node.learners(), &[3]);
244 node.remove_learner(3);
245 assert!(node.learners().is_empty());
246 }
247
248 #[test]
249 fn add_learner_on_leader_starts_tracking() {
250 let mut node = RaftNode::new(cfg(1, vec![]), MemStorage::new());
252 force_leader(&mut node);
253 assert_eq!(node.role(), NodeRole::Leader);
254
255 let _ = node.propose(b"x".to_vec()).unwrap();
257 let _ = node.take_ready();
258
259 node.add_learner(2);
260 assert_eq!(node.match_index_for(2), Some(0));
262
263 node.replicate_to_all();
265 let ready = node.take_ready();
266 let targets: Vec<u64> = ready.messages.iter().map(|(p, _)| *p).collect();
267 assert!(
268 targets.contains(&2),
269 "learner should receive AE, got {targets:?}"
270 );
271 }
272
273 #[test]
274 fn promote_self_flips_role() {
275 let mut c = cfg(2, vec![1]);
276 c.starts_as_learner = true;
277 let mut node = RaftNode::new(c, MemStorage::new());
278 assert_eq!(node.role(), NodeRole::Learner);
279 node.promote_self_to_voter();
280 assert_eq!(node.role(), NodeRole::Follower);
281 }
282}