Skip to main content

nodedb_raft/node/rpc/
append_entries.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `AppendEntries` request and response handlers.
4
5use tracing::warn;
6
7use crate::message::{AppendEntriesRequest, AppendEntriesResponse};
8use crate::node::core::RaftNode;
9use crate::state::NodeRole;
10use crate::storage::LogStorage;
11
12impl<S: LogStorage> RaftNode<S> {
13    /// Handle incoming AppendEntries RPC.
14    pub fn handle_append_entries(&mut self, req: &AppendEntriesRequest) -> AppendEntriesResponse {
15        if req.term < self.hard_state.current_term {
16            return AppendEntriesResponse {
17                term: self.hard_state.current_term,
18                success: false,
19                last_log_index: self.log.last_index(),
20            };
21        }
22
23        if req.term > self.hard_state.current_term || self.role == NodeRole::Candidate {
24            // `become_follower` preserves Learner role — see internal.rs.
25            self.become_follower(req.term);
26        }
27
28        self.leader_id = req.leader_id;
29        self.reset_election_timeout();
30
31        // Check prev_log consistency.
32        if req.prev_log_index > 0 {
33            match self.log.term_at(req.prev_log_index) {
34                Some(term) if term == req.prev_log_term => {}
35                _ => {
36                    return AppendEntriesResponse {
37                        term: self.hard_state.current_term,
38                        success: false,
39                        last_log_index: self.log.last_index(),
40                    };
41                }
42            }
43        }
44
45        if let Err(e) = self.log.append_entries(req.prev_log_index, &req.entries) {
46            warn!(group = self.config.group_id, error = %e, "append_entries failed");
47            return AppendEntriesResponse {
48                term: self.hard_state.current_term,
49                success: false,
50                last_log_index: self.log.last_index(),
51            };
52        }
53
54        if req.leader_commit > self.volatile.commit_index {
55            self.volatile.commit_index = req.leader_commit.min(self.log.last_index());
56            self.collect_committed_entries();
57        }
58
59        AppendEntriesResponse {
60            term: self.hard_state.current_term,
61            success: true,
62            last_log_index: self.log.last_index(),
63        }
64    }
65
66    /// Handle AppendEntries response from a peer (leader only).
67    ///
68    /// For voter peers: update match/next index and attempt commit advancement.
69    /// For learner peers: update match/next index only (no quorum contribution).
70    /// For observer peers: update observer state advisorily — no quorum
71    /// contribution, no commit advancement. Observer acks release backpressure
72    /// so the leader resumes sending to that observer.
73    pub fn handle_append_entries_response(&mut self, peer: u64, resp: &AppendEntriesResponse) {
74        if resp.term > self.hard_state.current_term {
75            self.become_follower(resp.term);
76            return;
77        }
78
79        if self.role != NodeRole::Leader {
80            return;
81        }
82
83        let peer_is_voter = self.config.peers.contains(&peer);
84        let peer_is_observer = self.config.observers.contains(&peer);
85
86        // Observer acks are advisory: update observer state and release
87        // backpressure, but never advance commit index.
88        if peer_is_observer {
89            let leader = match self.leader_state.as_mut() {
90                Some(ls) => ls,
91                None => return,
92            };
93            if resp.success {
94                if let Some(state) = leader.observer_state_mut(peer) {
95                    let new_match = resp.last_log_index;
96                    if new_match > state.match_index {
97                        state.match_index = new_match;
98                        state.next_index = new_match + 1;
99                    }
100                    // Release backpressure: observer drained some entries.
101                    state.pending_count = state.pending_count.saturating_sub(1);
102                }
103            } else {
104                if let Some(state) = leader.observer_state_mut(peer) {
105                    let new_next = resp.last_log_index + 1;
106                    if new_next < state.next_index {
107                        state.next_index = new_next.max(1);
108                    } else {
109                        state.next_index = state.next_index.saturating_sub(1).max(1);
110                    }
111                    state.pending_count = state.pending_count.saturating_sub(1);
112                }
113                self.send_append_entries_to_observer(peer);
114            }
115            // Observer acks never trigger commit advancement — return here.
116            return;
117        }
118
119        let leader = match self.leader_state.as_mut() {
120            Some(ls) => ls,
121            None => return,
122        };
123
124        if resp.success {
125            let new_match = resp.last_log_index;
126            if new_match > leader.match_index_for(peer) {
127                leader.set_match_index(peer, new_match);
128                leader.set_next_index(peer, new_match + 1);
129            }
130            if peer_is_voter {
131                self.try_advance_commit_index();
132            }
133        } else {
134            let new_next = resp.last_log_index + 1;
135            let current_next = leader.next_index_for(peer);
136            if new_next < current_next {
137                leader.set_next_index(peer, new_next.max(1));
138            } else {
139                leader.set_next_index(peer, current_next.saturating_sub(1).max(1));
140            }
141            self.send_append_entries(peer);
142        }
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::time::{Duration, Instant};
149
150    use crate::message::{
151        AppendEntriesRequest, AppendEntriesResponse, LogEntry, RequestVoteResponse,
152    };
153    use crate::node::config::RaftConfig;
154    use crate::node::core::RaftNode;
155    use crate::node::rpc::test_helpers::{setup_leader_with_observer, test_config};
156    use crate::state::NodeRole;
157    use crate::storage::MemStorage;
158
159    #[test]
160    fn follower_rejects_old_term() {
161        let config = test_config(1, vec![2, 3]);
162        let mut node = RaftNode::new(config, MemStorage::new());
163        node.hard_state.current_term = 5;
164
165        let req = AppendEntriesRequest {
166            term: 3,
167            leader_id: 2,
168            prev_log_index: 0,
169            prev_log_term: 0,
170            entries: vec![],
171            leader_commit: 0,
172            group_id: 1,
173        };
174
175        let resp = node.handle_append_entries(&req);
176        assert!(!resp.success);
177        assert_eq!(resp.term, 5);
178    }
179
180    #[test]
181    fn follower_accepts_valid_append() {
182        let config = test_config(1, vec![2, 3]);
183        let mut node = RaftNode::new(config, MemStorage::new());
184
185        let req = AppendEntriesRequest {
186            term: 1,
187            leader_id: 2,
188            prev_log_index: 0,
189            prev_log_term: 0,
190            entries: vec![
191                LogEntry {
192                    term: 1,
193                    index: 1,
194                    data: b"a".to_vec(),
195                },
196                LogEntry {
197                    term: 1,
198                    index: 2,
199                    data: b"b".to_vec(),
200                },
201            ],
202            leader_commit: 1,
203            group_id: 1,
204        };
205
206        let resp = node.handle_append_entries(&req);
207        assert!(resp.success);
208        assert_eq!(resp.last_log_index, 2);
209        assert_eq!(node.commit_index(), 1);
210        assert_eq!(node.leader_id(), 2);
211    }
212
213    #[test]
214    fn learner_accepts_append_entries_and_stays_learner() {
215        let mut config = test_config(2, vec![1]);
216        config.starts_as_learner = true;
217        let mut node = RaftNode::new(config, MemStorage::new());
218
219        let req = AppendEntriesRequest {
220            term: 1,
221            leader_id: 1,
222            prev_log_index: 0,
223            prev_log_term: 0,
224            entries: vec![LogEntry {
225                term: 1,
226                index: 1,
227                data: b"x".to_vec(),
228            }],
229            leader_commit: 1,
230            group_id: 1,
231        };
232
233        let resp = node.handle_append_entries(&req);
234        assert!(resp.success);
235        assert_eq!(node.commit_index(), 1);
236        // Crucially, the learner did not turn into a Follower.
237        assert_eq!(node.role(), NodeRole::Learner);
238        assert_eq!(node.leader_id(), 1);
239    }
240
241    #[test]
242    fn leader_steps_down_on_higher_term() {
243        let config = test_config(1, vec![2, 3]);
244        let mut node = RaftNode::new(config, MemStorage::new());
245
246        node.election_deadline = Instant::now() - Duration::from_millis(1);
247        node.tick();
248        let _ready = node.take_ready();
249        let resp = RequestVoteResponse {
250            term: 1,
251            vote_granted: true,
252        };
253        node.handle_request_vote_response(2, &resp);
254        assert_eq!(node.role(), NodeRole::Leader);
255
256        let req = AppendEntriesRequest {
257            term: 5,
258            leader_id: 2,
259            prev_log_index: 0,
260            prev_log_term: 0,
261            entries: vec![],
262            leader_commit: 0,
263            group_id: 1,
264        };
265        node.handle_append_entries(&req);
266        assert_eq!(node.role(), NodeRole::Follower);
267        assert_eq!(node.current_term(), 5);
268        assert_eq!(node.leader_id(), 2);
269    }
270
271    /// Learner AE responses update match_index but must NOT trigger a
272    /// commit advancement that relies on the learner counting toward
273    /// quorum.
274    #[test]
275    fn learner_ae_response_does_not_drive_commit() {
276        // 3 voters + 1 learner cluster: quorum = 2. Without any voter ACK,
277        // a learner "ack" must not advance commit_index.
278        let mut config = test_config(1, vec![2, 3]);
279        config.learners = vec![4];
280        let mut node = RaftNode::new(config, MemStorage::new());
281
282        // Force leader.
283        node.election_deadline_override(Instant::now() - Duration::from_millis(1));
284        node.tick();
285        // Grant self-vote via two voter responses.
286        let yes = RequestVoteResponse {
287            term: 1,
288            vote_granted: true,
289        };
290        node.handle_request_vote_response(2, &yes);
291        assert_eq!(node.role(), NodeRole::Leader);
292        let _ = node.take_ready();
293
294        // Propose an entry at index 2 (no-op is index 1).
295        let idx = node.propose(b"cmd".to_vec()).unwrap();
296        assert_eq!(idx, 2);
297        let _ = node.take_ready();
298
299        // Baseline: commit_index should still be <2 (no voter ACKs yet for index 2).
300        let baseline_commit = node.commit_index();
301        assert!(baseline_commit < 2);
302
303        // Learner (peer 4) ACKs index 2. This must NOT advance commit.
304        let ae_ok = AppendEntriesResponse {
305            term: 1,
306            success: true,
307            last_log_index: 2,
308        };
309        node.handle_append_entries_response(4, &ae_ok);
310        assert_eq!(
311            node.commit_index(),
312            baseline_commit,
313            "learner ACK must not contribute to commit quorum"
314        );
315
316        // Now a voter (peer 2) ACKs index 2. Quorum = 2 (self + peer 2) — commit advances.
317        node.handle_append_entries_response(2, &ae_ok);
318        assert_eq!(node.commit_index(), 2);
319    }
320
321    #[test]
322    fn three_node_replication() {
323        let config1 = test_config(1, vec![2, 3]);
324        let config2 = test_config(2, vec![1, 3]);
325
326        let mut node1 = RaftNode::new(config1, MemStorage::new());
327        let mut node2 = RaftNode::new(config2, MemStorage::new());
328
329        node1.election_deadline = Instant::now() - Duration::from_millis(1);
330        node1.tick();
331        let ready = node1.take_ready();
332        let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
333        node1.handle_request_vote_response(2, &resp2);
334        assert_eq!(node1.role(), NodeRole::Leader);
335
336        let heartbeat_ready = node1.take_ready();
337        for (peer_id, msg) in &heartbeat_ready.messages {
338            if *peer_id == 2 {
339                let resp = node2.handle_append_entries(msg);
340                node1.handle_append_entries_response(2, &resp);
341            }
342        }
343
344        let idx = node1.propose(b"cmd1".to_vec()).unwrap();
345        assert_eq!(idx, 2);
346
347        let ready = node1.take_ready();
348        for (peer_id, msg) in &ready.messages {
349            if *peer_id == 2 {
350                let resp = node2.handle_append_entries(msg);
351                assert!(resp.success);
352                node1.handle_append_entries_response(2, &resp);
353            }
354        }
355
356        let ready = node1.take_ready();
357        let committed: Vec<_> = ready
358            .committed_entries
359            .iter()
360            .filter(|e| !e.data.is_empty())
361            .collect();
362        assert_eq!(committed.len(), 1);
363        assert_eq!(committed[0].data, b"cmd1");
364    }
365
366    /// An observer receives AppendEntries, applies them, and stays in the
367    /// Observer role. Its ack must NOT advance the source commit index.
368    #[test]
369    fn observer_receives_entries_but_does_not_contribute_to_quorum() {
370        let (mut leader, mut obs) = setup_leader_with_observer();
371
372        // Propose an entry. Quorum = 2 (self + peer 2).
373        let idx = leader.propose(b"x".to_vec()).unwrap();
374        assert_eq!(idx, 2);
375        let ready = leader.take_ready();
376
377        let baseline_commit = leader.commit_index();
378        assert!(
379            baseline_commit < 2,
380            "commit should not advance without voter ACK"
381        );
382
383        // Observer receives the entry.
384        let obs_msg = ready
385            .messages
386            .iter()
387            .find(|(id, _)| *id == 5)
388            .map(|(_, m)| m.clone());
389        let obs_msg = obs_msg.expect("leader must send to observer");
390        let obs_resp = obs.handle_append_entries(&obs_msg);
391        assert!(obs_resp.success);
392        assert_eq!(
393            obs.role(),
394            NodeRole::Observer,
395            "observer must stay Observer"
396        );
397
398        // Feed observer ack back to leader. Commit must NOT advance.
399        leader.handle_append_entries_response(5, &obs_resp);
400        assert_eq!(
401            leader.commit_index(),
402            baseline_commit,
403            "observer ack must not contribute to commit quorum"
404        );
405
406        // Now voter 2 ACKs — quorum (self + peer 2) is met and commit advances.
407        let ae_ok = AppendEntriesResponse {
408            term: 1,
409            success: true,
410            last_log_index: idx,
411        };
412        leader.handle_append_entries_response(2, &ae_ok);
413        assert_eq!(leader.commit_index(), idx);
414    }
415
416    /// 3 voters + 1 observer: kill 2 voters → cluster loses quorum even
417    /// though the observer is still up and acking.
418    #[test]
419    fn observer_does_not_restore_lost_quorum() {
420        // Node 1 is leader, voters 2 + 3, observer 5.
421        let mut node1 = RaftNode::new(
422            RaftConfig {
423                node_id: 1,
424                group_id: 1,
425                peers: vec![2, 3],
426                learners: vec![],
427                observers: vec![5],
428                starts_as_learner: false,
429                starts_as_observer: false,
430                election_timeout_min: Duration::from_millis(150),
431                election_timeout_max: Duration::from_millis(300),
432                heartbeat_interval: Duration::from_millis(50),
433            },
434            MemStorage::new(),
435        );
436        // Elect node 1.
437        node1.election_deadline = Instant::now() - Duration::from_millis(1);
438        node1.tick();
439        let _ = node1.take_ready();
440        for v in [2u64, 3] {
441            node1.handle_request_vote_response(
442                v,
443                &RequestVoteResponse {
444                    term: 1,
445                    vote_granted: true,
446                },
447            );
448        }
449        assert_eq!(node1.role(), NodeRole::Leader);
450        let _ = node1.take_ready();
451
452        // Propose an entry at index 2.
453        let idx = node1.propose(b"cmd".to_vec()).unwrap();
454        let _ = node1.take_ready();
455        let pre_commit = node1.commit_index();
456        assert!(pre_commit < idx);
457
458        // Voters 2 and 3 are "dead". Only observer 5 acks.
459        let obs_ack = AppendEntriesResponse {
460            term: 1,
461            success: true,
462            last_log_index: idx,
463        };
464        node1.handle_append_entries_response(5, &obs_ack);
465        assert_eq!(
466            node1.commit_index(),
467            pre_commit,
468            "quorum is lost (2 voters dead); observer ack must not restore it"
469        );
470    }
471
472    /// An offline observer does not stall the source: voters commit normally
473    /// with no observer acks arriving at all.
474    #[test]
475    fn observer_crash_does_not_stall_source() {
476        let (mut leader, _obs) = setup_leader_with_observer();
477
478        let idx = leader.propose(b"y".to_vec()).unwrap();
479        assert_eq!(idx, 2);
480        let ready = leader.take_ready();
481
482        // Voter 2 acks. Observer 5 is "offline" (no ack received).
483        let voter_ack = AppendEntriesResponse {
484            term: 1,
485            success: true,
486            last_log_index: idx,
487        };
488        leader.handle_append_entries_response(2, &voter_ack);
489        assert_eq!(
490            leader.commit_index(),
491            idx,
492            "source must commit without observer ack (observer crash)"
493        );
494        let _ = ready;
495    }
496}