1use tracing::{debug, info, warn};
4
5use crate::message::{
6 AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
7 RequestVoteRequest, RequestVoteResponse,
8};
9use crate::state::NodeRole;
10use crate::storage::LogStorage;
11
12use super::core::RaftNode;
13
14impl<S: LogStorage> RaftNode<S> {
15 pub fn handle_append_entries(&mut self, req: &AppendEntriesRequest) -> AppendEntriesResponse {
17 if req.term < self.hard_state.current_term {
18 return AppendEntriesResponse {
19 term: self.hard_state.current_term,
20 success: false,
21 last_log_index: self.log.last_index(),
22 };
23 }
24
25 if req.term > self.hard_state.current_term || self.role == NodeRole::Candidate {
26 self.become_follower(req.term);
28 }
29
30 self.leader_id = req.leader_id;
31 self.reset_election_timeout();
32
33 if req.prev_log_index > 0 {
35 match self.log.term_at(req.prev_log_index) {
36 Some(term) if term == req.prev_log_term => {}
37 _ => {
38 return AppendEntriesResponse {
39 term: self.hard_state.current_term,
40 success: false,
41 last_log_index: self.log.last_index(),
42 };
43 }
44 }
45 }
46
47 if let Err(e) = self.log.append_entries(req.prev_log_index, &req.entries) {
48 warn!(group = self.config.group_id, error = %e, "append_entries failed");
49 return AppendEntriesResponse {
50 term: self.hard_state.current_term,
51 success: false,
52 last_log_index: self.log.last_index(),
53 };
54 }
55
56 if req.leader_commit > self.volatile.commit_index {
57 self.volatile.commit_index = req.leader_commit.min(self.log.last_index());
58 self.collect_committed_entries();
59 }
60
61 AppendEntriesResponse {
62 term: self.hard_state.current_term,
63 success: true,
64 last_log_index: self.log.last_index(),
65 }
66 }
67
68 pub fn handle_request_vote(&mut self, req: &RequestVoteRequest) -> RequestVoteResponse {
74 if self.role == NodeRole::Learner {
75 return RequestVoteResponse {
76 term: self.hard_state.current_term,
77 vote_granted: false,
78 };
79 }
80
81 if req.term > self.hard_state.current_term {
82 self.become_follower(req.term);
83 }
84
85 if req.term < self.hard_state.current_term {
86 return RequestVoteResponse {
87 term: self.hard_state.current_term,
88 vote_granted: false,
89 };
90 }
91
92 let voted_for = self.hard_state.voted_for;
93 let can_vote = voted_for == 0 || voted_for == req.candidate_id;
94
95 let log_ok = req.last_log_term > self.log.last_term()
96 || (req.last_log_term == self.log.last_term()
97 && req.last_log_index >= self.log.last_index());
98
99 if can_vote && log_ok {
100 self.hard_state.voted_for = req.candidate_id;
101 self.persist_hard_state();
102 self.reset_election_timeout();
103
104 debug!(
105 node = self.config.node_id,
106 group = self.config.group_id,
107 candidate = req.candidate_id,
108 term = req.term,
109 "granted vote"
110 );
111
112 RequestVoteResponse {
113 term: self.hard_state.current_term,
114 vote_granted: true,
115 }
116 } else {
117 RequestVoteResponse {
118 term: self.hard_state.current_term,
119 vote_granted: false,
120 }
121 }
122 }
123
124 pub fn handle_append_entries_response(&mut self, peer: u64, resp: &AppendEntriesResponse) {
131 if resp.term > self.hard_state.current_term {
132 self.become_follower(resp.term);
133 return;
134 }
135
136 if self.role != NodeRole::Leader {
137 return;
138 }
139
140 let peer_is_voter = self.config.peers.contains(&peer);
141
142 let leader = match self.leader_state.as_mut() {
143 Some(ls) => ls,
144 None => return,
145 };
146
147 if resp.success {
148 let new_match = resp.last_log_index;
149 if new_match > leader.match_index_for(peer) {
150 leader.set_match_index(peer, new_match);
151 leader.set_next_index(peer, new_match + 1);
152 }
153 if peer_is_voter {
154 self.try_advance_commit_index();
155 }
156 } else {
157 let new_next = resp.last_log_index + 1;
158 let current_next = leader.next_index_for(peer);
159 if new_next < current_next {
160 leader.set_next_index(peer, new_next.max(1));
161 } else {
162 leader.set_next_index(peer, current_next.saturating_sub(1).max(1));
163 }
164 self.send_append_entries(peer);
165 }
166 }
167
168 pub fn handle_request_vote_response(&mut self, _peer: u64, resp: &RequestVoteResponse) {
170 if resp.term > self.hard_state.current_term {
171 self.become_follower(resp.term);
172 return;
173 }
174
175 if self.role != NodeRole::Candidate {
176 return;
177 }
178
179 if resp.vote_granted {
180 self.votes_received.push(resp.term);
181 let vote_count = self.votes_received.len() + 1; if vote_count >= self.config.quorum() {
184 self.become_leader();
185 }
186 }
187 }
188
189 pub fn handle_install_snapshot(
195 &mut self,
196 req: &InstallSnapshotRequest,
197 ) -> InstallSnapshotResponse {
198 if req.term < self.hard_state.current_term {
199 return InstallSnapshotResponse {
200 term: self.hard_state.current_term,
201 };
202 }
203
204 if req.term > self.hard_state.current_term {
205 self.become_follower(req.term);
206 }
207
208 self.leader_id = req.leader_id;
209 self.reset_election_timeout();
210
211 if req.done && req.last_included_index > self.log.snapshot_index() {
212 info!(
213 node = self.config.node_id,
214 group = self.config.group_id,
215 snapshot_index = req.last_included_index,
216 snapshot_term = req.last_included_term,
217 "applying installed snapshot"
218 );
219
220 self.log
221 .apply_snapshot(req.last_included_index, req.last_included_term);
222
223 if self.volatile.commit_index < req.last_included_index {
224 self.volatile.commit_index = req.last_included_index;
225 }
226 if self.volatile.last_applied < req.last_included_index {
227 self.volatile.last_applied = req.last_included_index;
228 }
229 }
230
231 InstallSnapshotResponse {
232 term: self.hard_state.current_term,
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::time::{Duration, Instant};
240
241 use crate::message::{AppendEntriesRequest, LogEntry, RequestVoteRequest, RequestVoteResponse};
242 use crate::node::config::RaftConfig;
243 use crate::state::NodeRole;
244 use crate::storage::MemStorage;
245
246 use super::*;
247
248 fn test_config(node_id: u64, peers: Vec<u64>) -> RaftConfig {
249 RaftConfig {
250 node_id,
251 group_id: 1,
252 peers,
253 learners: vec![],
254 starts_as_learner: false,
255 election_timeout_min: Duration::from_millis(150),
256 election_timeout_max: Duration::from_millis(300),
257 heartbeat_interval: Duration::from_millis(50),
258 }
259 }
260
261 #[test]
262 fn follower_rejects_old_term() {
263 let config = test_config(1, vec![2, 3]);
264 let mut node = RaftNode::new(config, MemStorage::new());
265 node.hard_state.current_term = 5;
266
267 let req = AppendEntriesRequest {
268 term: 3,
269 leader_id: 2,
270 prev_log_index: 0,
271 prev_log_term: 0,
272 entries: vec![],
273 leader_commit: 0,
274 group_id: 1,
275 };
276
277 let resp = node.handle_append_entries(&req);
278 assert!(!resp.success);
279 assert_eq!(resp.term, 5);
280 }
281
282 #[test]
283 fn follower_accepts_valid_append() {
284 let config = test_config(1, vec![2, 3]);
285 let mut node = RaftNode::new(config, MemStorage::new());
286
287 let req = AppendEntriesRequest {
288 term: 1,
289 leader_id: 2,
290 prev_log_index: 0,
291 prev_log_term: 0,
292 entries: vec![
293 LogEntry {
294 term: 1,
295 index: 1,
296 data: b"a".to_vec(),
297 },
298 LogEntry {
299 term: 1,
300 index: 2,
301 data: b"b".to_vec(),
302 },
303 ],
304 leader_commit: 1,
305 group_id: 1,
306 };
307
308 let resp = node.handle_append_entries(&req);
309 assert!(resp.success);
310 assert_eq!(resp.last_log_index, 2);
311 assert_eq!(node.commit_index(), 1);
312 assert_eq!(node.leader_id(), 2);
313 }
314
315 #[test]
316 fn vote_grant_and_reject() {
317 let config = test_config(1, vec![2, 3]);
318 let mut node = RaftNode::new(config, MemStorage::new());
319
320 let req = RequestVoteRequest {
321 term: 1,
322 candidate_id: 2,
323 last_log_index: 0,
324 last_log_term: 0,
325 group_id: 1,
326 };
327 let resp = node.handle_request_vote(&req);
328 assert!(resp.vote_granted);
329
330 let req2 = RequestVoteRequest {
331 term: 1,
332 candidate_id: 3,
333 last_log_index: 0,
334 last_log_term: 0,
335 group_id: 1,
336 };
337 let resp2 = node.handle_request_vote(&req2);
338 assert!(!resp2.vote_granted);
339 }
340
341 #[test]
342 fn learner_rejects_vote_request() {
343 let mut config = test_config(2, vec![1]);
344 config.starts_as_learner = true;
345 let mut node = RaftNode::new(config, MemStorage::new());
346 assert_eq!(node.role(), NodeRole::Learner);
347
348 let req = RequestVoteRequest {
349 term: 5,
350 candidate_id: 1,
351 last_log_index: 10,
352 last_log_term: 4,
353 group_id: 1,
354 };
355 let resp = node.handle_request_vote(&req);
356 assert!(
357 !resp.vote_granted,
358 "learner must never grant a vote, got {resp:?}"
359 );
360 }
361
362 #[test]
363 fn learner_accepts_append_entries_and_stays_learner() {
364 let mut config = test_config(2, vec![1]);
365 config.starts_as_learner = true;
366 let mut node = RaftNode::new(config, MemStorage::new());
367
368 let req = AppendEntriesRequest {
369 term: 1,
370 leader_id: 1,
371 prev_log_index: 0,
372 prev_log_term: 0,
373 entries: vec![LogEntry {
374 term: 1,
375 index: 1,
376 data: b"x".to_vec(),
377 }],
378 leader_commit: 1,
379 group_id: 1,
380 };
381
382 let resp = node.handle_append_entries(&req);
383 assert!(resp.success);
384 assert_eq!(node.commit_index(), 1);
385 assert_eq!(node.role(), NodeRole::Learner);
387 assert_eq!(node.leader_id(), 1);
388 }
389
390 #[test]
394 fn learner_ae_response_does_not_drive_commit() {
395 let mut config = test_config(1, vec![2, 3]);
398 config.learners = vec![4];
399 let mut node = RaftNode::new(config, MemStorage::new());
400
401 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
403 node.tick();
404 let yes = RequestVoteResponse {
406 term: 1,
407 vote_granted: true,
408 };
409 node.handle_request_vote_response(2, &yes);
410 assert_eq!(node.role(), NodeRole::Leader);
411 let _ = node.take_ready();
412
413 let idx = node.propose(b"cmd".to_vec()).unwrap();
415 assert_eq!(idx, 2);
416 let _ = node.take_ready();
417
418 let baseline_commit = node.commit_index();
420 assert!(baseline_commit < 2);
421
422 let ae_ok = AppendEntriesResponse {
424 term: 1,
425 success: true,
426 last_log_index: 2,
427 };
428 node.handle_append_entries_response(4, &ae_ok);
429 assert_eq!(
430 node.commit_index(),
431 baseline_commit,
432 "learner ACK must not contribute to commit quorum"
433 );
434
435 node.handle_append_entries_response(2, &ae_ok);
437 assert_eq!(node.commit_index(), 2);
438 }
439
440 #[test]
441 fn three_node_election() {
442 let config1 = test_config(1, vec![2, 3]);
443 let config2 = test_config(2, vec![1, 3]);
444 let config3 = test_config(3, vec![1, 2]);
445
446 let mut node1 = RaftNode::new(config1, MemStorage::new());
447 let mut node2 = RaftNode::new(config2, MemStorage::new());
448 let mut node3 = RaftNode::new(config3, MemStorage::new());
449
450 node1.election_deadline = Instant::now() - Duration::from_millis(1);
451 node1.tick();
452 assert_eq!(node1.role(), NodeRole::Candidate);
453
454 let ready = node1.take_ready();
455 assert_eq!(ready.vote_requests.len(), 2);
456
457 let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
458 let resp3 = node3.handle_request_vote(&ready.vote_requests[1].1);
459 assert!(resp2.vote_granted);
460 assert!(resp3.vote_granted);
461
462 node1.handle_request_vote_response(2, &resp2);
463 assert_eq!(node1.role(), NodeRole::Leader);
464 }
465
466 #[test]
467 fn three_node_replication() {
468 let config1 = test_config(1, vec![2, 3]);
469 let config2 = test_config(2, vec![1, 3]);
470
471 let mut node1 = RaftNode::new(config1, MemStorage::new());
472 let mut node2 = RaftNode::new(config2, MemStorage::new());
473
474 node1.election_deadline = Instant::now() - Duration::from_millis(1);
475 node1.tick();
476 let ready = node1.take_ready();
477 let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
478 node1.handle_request_vote_response(2, &resp2);
479 assert_eq!(node1.role(), NodeRole::Leader);
480
481 let heartbeat_ready = node1.take_ready();
482 for (peer_id, msg) in &heartbeat_ready.messages {
483 if *peer_id == 2 {
484 let resp = node2.handle_append_entries(msg);
485 node1.handle_append_entries_response(2, &resp);
486 }
487 }
488
489 let idx = node1.propose(b"cmd1".to_vec()).unwrap();
490 assert_eq!(idx, 2);
491
492 let ready = node1.take_ready();
493 for (peer_id, msg) in &ready.messages {
494 if *peer_id == 2 {
495 let resp = node2.handle_append_entries(msg);
496 assert!(resp.success);
497 node1.handle_append_entries_response(2, &resp);
498 }
499 }
500
501 let ready = node1.take_ready();
502 let committed: Vec<_> = ready
503 .committed_entries
504 .iter()
505 .filter(|e| !e.data.is_empty())
506 .collect();
507 assert_eq!(committed.len(), 1);
508 assert_eq!(committed[0].data, b"cmd1");
509 }
510
511 #[test]
512 fn leader_steps_down_on_higher_term() {
513 let config = test_config(1, vec![2, 3]);
514 let mut node = RaftNode::new(config, MemStorage::new());
515
516 node.election_deadline = Instant::now() - Duration::from_millis(1);
517 node.tick();
518 let _ready = node.take_ready();
519 let resp = RequestVoteResponse {
520 term: 1,
521 vote_granted: true,
522 };
523 node.handle_request_vote_response(2, &resp);
524 assert_eq!(node.role(), NodeRole::Leader);
525
526 let req = AppendEntriesRequest {
527 term: 5,
528 leader_id: 2,
529 prev_log_index: 0,
530 prev_log_term: 0,
531 entries: vec![],
532 leader_commit: 0,
533 group_id: 1,
534 };
535 node.handle_append_entries(&req);
536 assert_eq!(node.role(), NodeRole::Follower);
537 assert_eq!(node.current_term(), 5);
538 assert_eq!(node.leader_id(), 2);
539 }
540}