Skip to main content

nodedb_raft/node/
rpc.rs

1//! RPC handlers for incoming Raft messages.
2
3use tracing::{debug, info, warn};
4
5use crate::message::{
6    AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
7    RequestVoteRequest, RequestVoteResponse,
8};
9use crate::state::NodeRole;
10use crate::storage::LogStorage;
11
12use super::core::RaftNode;
13
14impl<S: LogStorage> RaftNode<S> {
15    /// Handle incoming AppendEntries RPC.
16    pub fn handle_append_entries(&mut self, req: &AppendEntriesRequest) -> AppendEntriesResponse {
17        if req.term < self.hard_state.current_term {
18            return AppendEntriesResponse {
19                term: self.hard_state.current_term,
20                success: false,
21                last_log_index: self.log.last_index(),
22            };
23        }
24
25        if req.term > self.hard_state.current_term || self.role == NodeRole::Candidate {
26            // `become_follower` preserves Learner role — see internal.rs.
27            self.become_follower(req.term);
28        }
29
30        self.leader_id = req.leader_id;
31        self.reset_election_timeout();
32
33        // Check prev_log consistency.
34        if req.prev_log_index > 0 {
35            match self.log.term_at(req.prev_log_index) {
36                Some(term) if term == req.prev_log_term => {}
37                _ => {
38                    return AppendEntriesResponse {
39                        term: self.hard_state.current_term,
40                        success: false,
41                        last_log_index: self.log.last_index(),
42                    };
43                }
44            }
45        }
46
47        if let Err(e) = self.log.append_entries(req.prev_log_index, &req.entries) {
48            warn!(group = self.config.group_id, error = %e, "append_entries failed");
49            return AppendEntriesResponse {
50                term: self.hard_state.current_term,
51                success: false,
52                last_log_index: self.log.last_index(),
53            };
54        }
55
56        if req.leader_commit > self.volatile.commit_index {
57            self.volatile.commit_index = req.leader_commit.min(self.log.last_index());
58            self.collect_committed_entries();
59        }
60
61        AppendEntriesResponse {
62            term: self.hard_state.current_term,
63            success: true,
64            last_log_index: self.log.last_index(),
65        }
66    }
67
68    /// Handle incoming RequestVote RPC.
69    ///
70    /// Learners never grant votes: by definition they are not members of
71    /// the voting set for this term, and granting a vote could let an
72    /// incorrect quorum form.
73    pub fn handle_request_vote(&mut self, req: &RequestVoteRequest) -> RequestVoteResponse {
74        if self.role == NodeRole::Learner {
75            return RequestVoteResponse {
76                term: self.hard_state.current_term,
77                vote_granted: false,
78            };
79        }
80
81        if req.term > self.hard_state.current_term {
82            self.become_follower(req.term);
83        }
84
85        if req.term < self.hard_state.current_term {
86            return RequestVoteResponse {
87                term: self.hard_state.current_term,
88                vote_granted: false,
89            };
90        }
91
92        let voted_for = self.hard_state.voted_for;
93        let can_vote = voted_for == 0 || voted_for == req.candidate_id;
94
95        let log_ok = req.last_log_term > self.log.last_term()
96            || (req.last_log_term == self.log.last_term()
97                && req.last_log_index >= self.log.last_index());
98
99        if can_vote && log_ok {
100            self.hard_state.voted_for = req.candidate_id;
101            self.persist_hard_state();
102            self.reset_election_timeout();
103
104            debug!(
105                node = self.config.node_id,
106                group = self.config.group_id,
107                candidate = req.candidate_id,
108                term = req.term,
109                "granted vote"
110            );
111
112            RequestVoteResponse {
113                term: self.hard_state.current_term,
114                vote_granted: true,
115            }
116        } else {
117            RequestVoteResponse {
118                term: self.hard_state.current_term,
119                vote_granted: false,
120            }
121        }
122    }
123
124    /// Handle AppendEntries response from a peer (leader only).
125    ///
126    /// For both voter and learner peers we update `match_index`/`next_index`
127    /// so we can tell when a learner has caught up. Only voter responses
128    /// trigger a commit-advancement check — learners never contribute to
129    /// the commit quorum.
130    pub fn handle_append_entries_response(&mut self, peer: u64, resp: &AppendEntriesResponse) {
131        if resp.term > self.hard_state.current_term {
132            self.become_follower(resp.term);
133            return;
134        }
135
136        if self.role != NodeRole::Leader {
137            return;
138        }
139
140        let peer_is_voter = self.config.peers.contains(&peer);
141
142        let leader = match self.leader_state.as_mut() {
143            Some(ls) => ls,
144            None => return,
145        };
146
147        if resp.success {
148            let new_match = resp.last_log_index;
149            if new_match > leader.match_index_for(peer) {
150                leader.set_match_index(peer, new_match);
151                leader.set_next_index(peer, new_match + 1);
152            }
153            if peer_is_voter {
154                self.try_advance_commit_index();
155            }
156        } else {
157            let new_next = resp.last_log_index + 1;
158            let current_next = leader.next_index_for(peer);
159            if new_next < current_next {
160                leader.set_next_index(peer, new_next.max(1));
161            } else {
162                leader.set_next_index(peer, current_next.saturating_sub(1).max(1));
163            }
164            self.send_append_entries(peer);
165        }
166    }
167
168    /// Handle RequestVote response (candidate only).
169    pub fn handle_request_vote_response(&mut self, _peer: u64, resp: &RequestVoteResponse) {
170        if resp.term > self.hard_state.current_term {
171            self.become_follower(resp.term);
172            return;
173        }
174
175        if self.role != NodeRole::Candidate {
176            return;
177        }
178
179        if resp.vote_granted {
180            self.votes_received.push(resp.term);
181            let vote_count = self.votes_received.len() + 1; // +1 for self-vote
182
183            if vote_count >= self.config.quorum() {
184                self.become_leader();
185            }
186        }
187    }
188
189    /// Handle incoming InstallSnapshot RPC (Raft paper Figure 13).
190    ///
191    /// Called on followers (and learners) that are too far behind for
192    /// log-based catch-up. The leader sends its snapshot; the receiver
193    /// replaces its log and state.
194    pub fn handle_install_snapshot(
195        &mut self,
196        req: &InstallSnapshotRequest,
197    ) -> InstallSnapshotResponse {
198        if req.term < self.hard_state.current_term {
199            return InstallSnapshotResponse {
200                term: self.hard_state.current_term,
201            };
202        }
203
204        if req.term > self.hard_state.current_term {
205            self.become_follower(req.term);
206        }
207
208        self.leader_id = req.leader_id;
209        self.reset_election_timeout();
210
211        if req.done && req.last_included_index > self.log.snapshot_index() {
212            info!(
213                node = self.config.node_id,
214                group = self.config.group_id,
215                snapshot_index = req.last_included_index,
216                snapshot_term = req.last_included_term,
217                "applying installed snapshot"
218            );
219
220            self.log
221                .apply_snapshot(req.last_included_index, req.last_included_term);
222
223            if self.volatile.commit_index < req.last_included_index {
224                self.volatile.commit_index = req.last_included_index;
225            }
226            if self.volatile.last_applied < req.last_included_index {
227                self.volatile.last_applied = req.last_included_index;
228            }
229        }
230
231        InstallSnapshotResponse {
232            term: self.hard_state.current_term,
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use std::time::{Duration, Instant};
240
241    use crate::message::{AppendEntriesRequest, LogEntry, RequestVoteRequest, RequestVoteResponse};
242    use crate::node::config::RaftConfig;
243    use crate::state::NodeRole;
244    use crate::storage::MemStorage;
245
246    use super::*;
247
248    fn test_config(node_id: u64, peers: Vec<u64>) -> RaftConfig {
249        RaftConfig {
250            node_id,
251            group_id: 1,
252            peers,
253            learners: vec![],
254            starts_as_learner: false,
255            election_timeout_min: Duration::from_millis(150),
256            election_timeout_max: Duration::from_millis(300),
257            heartbeat_interval: Duration::from_millis(50),
258        }
259    }
260
261    #[test]
262    fn follower_rejects_old_term() {
263        let config = test_config(1, vec![2, 3]);
264        let mut node = RaftNode::new(config, MemStorage::new());
265        node.hard_state.current_term = 5;
266
267        let req = AppendEntriesRequest {
268            term: 3,
269            leader_id: 2,
270            prev_log_index: 0,
271            prev_log_term: 0,
272            entries: vec![],
273            leader_commit: 0,
274            group_id: 1,
275        };
276
277        let resp = node.handle_append_entries(&req);
278        assert!(!resp.success);
279        assert_eq!(resp.term, 5);
280    }
281
282    #[test]
283    fn follower_accepts_valid_append() {
284        let config = test_config(1, vec![2, 3]);
285        let mut node = RaftNode::new(config, MemStorage::new());
286
287        let req = AppendEntriesRequest {
288            term: 1,
289            leader_id: 2,
290            prev_log_index: 0,
291            prev_log_term: 0,
292            entries: vec![
293                LogEntry {
294                    term: 1,
295                    index: 1,
296                    data: b"a".to_vec(),
297                },
298                LogEntry {
299                    term: 1,
300                    index: 2,
301                    data: b"b".to_vec(),
302                },
303            ],
304            leader_commit: 1,
305            group_id: 1,
306        };
307
308        let resp = node.handle_append_entries(&req);
309        assert!(resp.success);
310        assert_eq!(resp.last_log_index, 2);
311        assert_eq!(node.commit_index(), 1);
312        assert_eq!(node.leader_id(), 2);
313    }
314
315    #[test]
316    fn vote_grant_and_reject() {
317        let config = test_config(1, vec![2, 3]);
318        let mut node = RaftNode::new(config, MemStorage::new());
319
320        let req = RequestVoteRequest {
321            term: 1,
322            candidate_id: 2,
323            last_log_index: 0,
324            last_log_term: 0,
325            group_id: 1,
326        };
327        let resp = node.handle_request_vote(&req);
328        assert!(resp.vote_granted);
329
330        let req2 = RequestVoteRequest {
331            term: 1,
332            candidate_id: 3,
333            last_log_index: 0,
334            last_log_term: 0,
335            group_id: 1,
336        };
337        let resp2 = node.handle_request_vote(&req2);
338        assert!(!resp2.vote_granted);
339    }
340
341    #[test]
342    fn learner_rejects_vote_request() {
343        let mut config = test_config(2, vec![1]);
344        config.starts_as_learner = true;
345        let mut node = RaftNode::new(config, MemStorage::new());
346        assert_eq!(node.role(), NodeRole::Learner);
347
348        let req = RequestVoteRequest {
349            term: 5,
350            candidate_id: 1,
351            last_log_index: 10,
352            last_log_term: 4,
353            group_id: 1,
354        };
355        let resp = node.handle_request_vote(&req);
356        assert!(
357            !resp.vote_granted,
358            "learner must never grant a vote, got {resp:?}"
359        );
360    }
361
362    #[test]
363    fn learner_accepts_append_entries_and_stays_learner() {
364        let mut config = test_config(2, vec![1]);
365        config.starts_as_learner = true;
366        let mut node = RaftNode::new(config, MemStorage::new());
367
368        let req = AppendEntriesRequest {
369            term: 1,
370            leader_id: 1,
371            prev_log_index: 0,
372            prev_log_term: 0,
373            entries: vec![LogEntry {
374                term: 1,
375                index: 1,
376                data: b"x".to_vec(),
377            }],
378            leader_commit: 1,
379            group_id: 1,
380        };
381
382        let resp = node.handle_append_entries(&req);
383        assert!(resp.success);
384        assert_eq!(node.commit_index(), 1);
385        // Crucially, the learner did not turn into a Follower.
386        assert_eq!(node.role(), NodeRole::Learner);
387        assert_eq!(node.leader_id(), 1);
388    }
389
390    /// Learner AE responses update match_index but must NOT trigger a
391    /// commit advancement that relies on the learner counting toward
392    /// quorum.
393    #[test]
394    fn learner_ae_response_does_not_drive_commit() {
395        // 3 voters + 1 learner cluster: quorum = 2. Without any voter ACK,
396        // a learner "ack" must not advance commit_index.
397        let mut config = test_config(1, vec![2, 3]);
398        config.learners = vec![4];
399        let mut node = RaftNode::new(config, MemStorage::new());
400
401        // Force leader.
402        node.election_deadline_override(Instant::now() - Duration::from_millis(1));
403        node.tick();
404        // Grant self-vote via two voter responses.
405        let yes = RequestVoteResponse {
406            term: 1,
407            vote_granted: true,
408        };
409        node.handle_request_vote_response(2, &yes);
410        assert_eq!(node.role(), NodeRole::Leader);
411        let _ = node.take_ready();
412
413        // Propose an entry at index 2 (no-op is index 1).
414        let idx = node.propose(b"cmd".to_vec()).unwrap();
415        assert_eq!(idx, 2);
416        let _ = node.take_ready();
417
418        // Baseline: commit_index should still be <2 (no voter ACKs yet for index 2).
419        let baseline_commit = node.commit_index();
420        assert!(baseline_commit < 2);
421
422        // Learner (peer 4) ACKs index 2. This must NOT advance commit.
423        let ae_ok = AppendEntriesResponse {
424            term: 1,
425            success: true,
426            last_log_index: 2,
427        };
428        node.handle_append_entries_response(4, &ae_ok);
429        assert_eq!(
430            node.commit_index(),
431            baseline_commit,
432            "learner ACK must not contribute to commit quorum"
433        );
434
435        // Now a voter (peer 2) ACKs index 2. Quorum = 2 (self + peer 2) — commit advances.
436        node.handle_append_entries_response(2, &ae_ok);
437        assert_eq!(node.commit_index(), 2);
438    }
439
440    #[test]
441    fn three_node_election() {
442        let config1 = test_config(1, vec![2, 3]);
443        let config2 = test_config(2, vec![1, 3]);
444        let config3 = test_config(3, vec![1, 2]);
445
446        let mut node1 = RaftNode::new(config1, MemStorage::new());
447        let mut node2 = RaftNode::new(config2, MemStorage::new());
448        let mut node3 = RaftNode::new(config3, MemStorage::new());
449
450        node1.election_deadline = Instant::now() - Duration::from_millis(1);
451        node1.tick();
452        assert_eq!(node1.role(), NodeRole::Candidate);
453
454        let ready = node1.take_ready();
455        assert_eq!(ready.vote_requests.len(), 2);
456
457        let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
458        let resp3 = node3.handle_request_vote(&ready.vote_requests[1].1);
459        assert!(resp2.vote_granted);
460        assert!(resp3.vote_granted);
461
462        node1.handle_request_vote_response(2, &resp2);
463        assert_eq!(node1.role(), NodeRole::Leader);
464    }
465
466    #[test]
467    fn three_node_replication() {
468        let config1 = test_config(1, vec![2, 3]);
469        let config2 = test_config(2, vec![1, 3]);
470
471        let mut node1 = RaftNode::new(config1, MemStorage::new());
472        let mut node2 = RaftNode::new(config2, MemStorage::new());
473
474        node1.election_deadline = Instant::now() - Duration::from_millis(1);
475        node1.tick();
476        let ready = node1.take_ready();
477        let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
478        node1.handle_request_vote_response(2, &resp2);
479        assert_eq!(node1.role(), NodeRole::Leader);
480
481        let heartbeat_ready = node1.take_ready();
482        for (peer_id, msg) in &heartbeat_ready.messages {
483            if *peer_id == 2 {
484                let resp = node2.handle_append_entries(msg);
485                node1.handle_append_entries_response(2, &resp);
486            }
487        }
488
489        let idx = node1.propose(b"cmd1".to_vec()).unwrap();
490        assert_eq!(idx, 2);
491
492        let ready = node1.take_ready();
493        for (peer_id, msg) in &ready.messages {
494            if *peer_id == 2 {
495                let resp = node2.handle_append_entries(msg);
496                assert!(resp.success);
497                node1.handle_append_entries_response(2, &resp);
498            }
499        }
500
501        let ready = node1.take_ready();
502        let committed: Vec<_> = ready
503            .committed_entries
504            .iter()
505            .filter(|e| !e.data.is_empty())
506            .collect();
507        assert_eq!(committed.len(), 1);
508        assert_eq!(committed[0].data, b"cmd1");
509    }
510
511    #[test]
512    fn leader_steps_down_on_higher_term() {
513        let config = test_config(1, vec![2, 3]);
514        let mut node = RaftNode::new(config, MemStorage::new());
515
516        node.election_deadline = Instant::now() - Duration::from_millis(1);
517        node.tick();
518        let _ready = node.take_ready();
519        let resp = RequestVoteResponse {
520            term: 1,
521            vote_granted: true,
522        };
523        node.handle_request_vote_response(2, &resp);
524        assert_eq!(node.role(), NodeRole::Leader);
525
526        let req = AppendEntriesRequest {
527            term: 5,
528            leader_id: 2,
529            prev_log_index: 0,
530            prev_log_term: 0,
531            entries: vec![],
532            leader_commit: 0,
533            group_id: 1,
534        };
535        node.handle_append_entries(&req);
536        assert_eq!(node.role(), NodeRole::Follower);
537        assert_eq!(node.current_term(), 5);
538        assert_eq!(node.leader_id(), 2);
539    }
540}