nodedb_raft/node/
membership.rs1use tracing::info;
12
13use crate::state::NodeRole;
14use crate::storage::LogStorage;
15
16use super::core::RaftNode;
17
18impl<S: LogStorage> RaftNode<S> {
19 pub(super) fn set_voters(&mut self, new_voters: Vec<u64>) {
25 let last_index = self.log.last_index();
26
27 if let Some(ref mut leader) = self.leader_state {
28 for &peer in &new_voters {
29 if !self.config.peers.contains(&peer) && !self.config.learners.contains(&peer) {
30 leader.add_peer(peer, last_index);
31 info!(
32 node = self.config.node_id,
33 group = self.config.group_id,
34 peer,
35 "added voter to leader tracking"
36 );
37 }
38 }
39 for &peer in &self.config.peers {
40 if !new_voters.contains(&peer) && !self.config.learners.contains(&peer) {
41 leader.remove_peer(peer);
42 info!(
43 node = self.config.node_id,
44 group = self.config.group_id,
45 peer,
46 "removed voter from leader tracking"
47 );
48 }
49 }
50 }
51
52 self.config.peers = new_voters;
53 }
54
55 pub fn add_peer(&mut self, peer: u64) {
62 if peer == self.config.node_id
63 || self.config.peers.contains(&peer)
64 || self.config.learners.contains(&peer)
65 {
66 return;
67 }
68 let mut new_peers = self.config.peers.clone();
69 new_peers.push(peer);
70 self.set_voters(new_peers);
71 }
72
73 pub fn remove_peer(&mut self, peer: u64) {
75 if !self.config.peers.contains(&peer) {
76 return;
77 }
78 let new_peers: Vec<u64> = self
79 .config
80 .peers
81 .iter()
82 .copied()
83 .filter(|&id| id != peer)
84 .collect();
85 self.set_voters(new_peers);
86 }
87
88 pub fn add_learner(&mut self, peer: u64) {
97 if peer == self.config.node_id
98 || self.config.peers.contains(&peer)
99 || self.config.learners.contains(&peer)
100 {
101 return;
102 }
103
104 let last_index = self.log.last_index();
105 if let Some(ref mut leader) = self.leader_state {
106 leader.add_peer(peer, last_index);
107 }
108 self.config.learners.push(peer);
109
110 info!(
111 node = self.config.node_id,
112 group = self.config.group_id,
113 peer,
114 "added learner peer"
115 );
116 }
117
118 pub fn remove_learner(&mut self, peer: u64) {
120 if !self.config.learners.contains(&peer) {
121 return;
122 }
123 if let Some(ref mut leader) = self.leader_state {
124 leader.remove_peer(peer);
125 }
126 self.config.learners.retain(|&id| id != peer);
127
128 info!(
129 node = self.config.node_id,
130 group = self.config.group_id,
131 peer,
132 "removed learner peer"
133 );
134 }
135
136 pub fn promote_learner(&mut self, peer: u64) -> bool {
146 if !self.config.learners.contains(&peer) {
147 return false;
148 }
149 self.config.learners.retain(|&id| id != peer);
150 if !self.config.peers.contains(&peer) {
151 self.config.peers.push(peer);
152 }
153
154 info!(
155 node = self.config.node_id,
156 group = self.config.group_id,
157 peer,
158 "promoted learner to voter"
159 );
160 true
161 }
162
163 pub fn promote_self_to_voter(&mut self) {
169 if self.role == NodeRole::Learner {
170 self.role = NodeRole::Follower;
171 self.config.starts_as_learner = false;
172 info!(
174 node = self.config.node_id,
175 group = self.config.group_id,
176 "promoted self from learner to follower"
177 );
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use crate::node::config::RaftConfig;
185 use crate::node::core::RaftNode;
186 use crate::state::NodeRole;
187 use crate::storage::MemStorage;
188 use std::time::{Duration, Instant};
189
190 fn cfg(node_id: u64, peers: Vec<u64>) -> RaftConfig {
191 RaftConfig {
192 node_id,
193 group_id: 0,
194 peers,
195 learners: vec![],
196 observers: vec![],
197 starts_as_learner: false,
198 starts_as_observer: false,
199 election_timeout_min: Duration::from_millis(150),
200 election_timeout_max: Duration::from_millis(300),
201 heartbeat_interval: Duration::from_millis(50),
202 }
203 }
204
205 fn force_leader(node: &mut RaftNode<MemStorage>) {
206 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
207 node.tick();
208 let _ = node.take_ready();
210 }
212
213 #[test]
214 fn add_learner_does_not_change_quorum() {
215 let mut node = RaftNode::new(cfg(1, vec![2, 3]), MemStorage::new());
217 assert_eq!(node.config.quorum(), 2);
218
219 node.add_learner(4);
220 assert_eq!(node.learners(), &[4]);
221 assert_eq!(node.config.quorum(), 2);
223 assert_eq!(node.config.cluster_size(), 3);
224 }
225
226 #[test]
227 fn promote_learner_grows_quorum() {
228 let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
229 assert_eq!(node.config.quorum(), 2); node.add_learner(3);
232 assert_eq!(node.config.quorum(), 2);
233
234 let promoted = node.promote_learner(3);
235 assert!(promoted);
236 assert_eq!(node.voters(), &[2, 3]);
237 assert!(node.learners().is_empty());
238 assert_eq!(node.config.cluster_size(), 3);
240 assert_eq!(node.config.quorum(), 2);
241 }
242
243 #[test]
244 fn remove_learner_drops_peer() {
245 let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
246 node.add_learner(3);
247 assert_eq!(node.learners(), &[3]);
248 node.remove_learner(3);
249 assert!(node.learners().is_empty());
250 }
251
252 #[test]
253 fn add_learner_on_leader_starts_tracking() {
254 let mut node = RaftNode::new(cfg(1, vec![]), MemStorage::new());
256 force_leader(&mut node);
257 assert_eq!(node.role(), NodeRole::Leader);
258
259 let _ = node.propose(b"x".to_vec()).unwrap();
261 let _ = node.take_ready();
262
263 node.add_learner(2);
264 assert_eq!(node.match_index_for(2), Some(0));
266
267 node.replicate_to_all();
269 let ready = node.take_ready();
270 let targets: Vec<u64> = ready.messages.iter().map(|(p, _)| *p).collect();
271 assert!(
272 targets.contains(&2),
273 "learner should receive AE, got {targets:?}"
274 );
275 }
276
277 #[test]
278 fn promote_self_flips_role() {
279 let mut c = cfg(2, vec![1]);
280 c.starts_as_learner = true;
281 let mut node = RaftNode::new(c, MemStorage::new());
282 assert_eq!(node.role(), NodeRole::Learner);
283 node.promote_self_to_voter();
284 assert_eq!(node.role(), NodeRole::Follower);
285 }
286}