1use tracing::warn;
6
7use crate::message::{AppendEntriesRequest, AppendEntriesResponse};
8use crate::node::core::RaftNode;
9use crate::state::NodeRole;
10use crate::storage::LogStorage;
11
12impl<S: LogStorage> RaftNode<S> {
13 pub fn handle_append_entries(&mut self, req: &AppendEntriesRequest) -> AppendEntriesResponse {
15 if req.term < self.hard_state.current_term {
16 return AppendEntriesResponse {
17 term: self.hard_state.current_term,
18 success: false,
19 last_log_index: self.log.last_index(),
20 };
21 }
22
23 if req.term > self.hard_state.current_term || self.role == NodeRole::Candidate {
24 self.become_follower(req.term);
26 }
27
28 self.leader_id = req.leader_id;
29 self.reset_election_timeout();
30
31 if req.prev_log_index > 0 {
33 match self.log.term_at(req.prev_log_index) {
34 Some(term) if term == req.prev_log_term => {}
35 _ => {
36 return AppendEntriesResponse {
37 term: self.hard_state.current_term,
38 success: false,
39 last_log_index: self.log.last_index(),
40 };
41 }
42 }
43 }
44
45 if let Err(e) = self.log.append_entries(req.prev_log_index, &req.entries) {
46 warn!(group = self.config.group_id, error = %e, "append_entries failed");
47 return AppendEntriesResponse {
48 term: self.hard_state.current_term,
49 success: false,
50 last_log_index: self.log.last_index(),
51 };
52 }
53
54 if req.leader_commit > self.volatile.commit_index {
55 self.volatile.commit_index = req.leader_commit.min(self.log.last_index());
56 self.collect_committed_entries();
57 }
58
59 AppendEntriesResponse {
60 term: self.hard_state.current_term,
61 success: true,
62 last_log_index: self.log.last_index(),
63 }
64 }
65
66 pub fn handle_append_entries_response(&mut self, peer: u64, resp: &AppendEntriesResponse) {
74 if resp.term > self.hard_state.current_term {
75 self.become_follower(resp.term);
76 return;
77 }
78
79 if self.role != NodeRole::Leader {
80 return;
81 }
82
83 let peer_is_voter = self.config.peers.contains(&peer);
84 let peer_is_observer = self.config.observers.contains(&peer);
85
86 if peer_is_observer {
89 let leader = match self.leader_state.as_mut() {
90 Some(ls) => ls,
91 None => return,
92 };
93 if resp.success {
94 if let Some(state) = leader.observer_state_mut(peer) {
95 let new_match = resp.last_log_index;
96 if new_match > state.match_index {
97 state.match_index = new_match;
98 state.next_index = new_match + 1;
99 }
100 state.pending_count = state.pending_count.saturating_sub(1);
102 }
103 } else {
104 if let Some(state) = leader.observer_state_mut(peer) {
105 let new_next = resp.last_log_index + 1;
106 if new_next < state.next_index {
107 state.next_index = new_next.max(1);
108 } else {
109 state.next_index = state.next_index.saturating_sub(1).max(1);
110 }
111 state.pending_count = state.pending_count.saturating_sub(1);
112 }
113 self.send_append_entries_to_observer(peer);
114 }
115 return;
117 }
118
119 let leader = match self.leader_state.as_mut() {
120 Some(ls) => ls,
121 None => return,
122 };
123
124 if resp.success {
125 let new_match = resp.last_log_index;
126 if new_match > leader.match_index_for(peer) {
127 leader.set_match_index(peer, new_match);
128 leader.set_next_index(peer, new_match + 1);
129 }
130 if peer_is_voter {
131 self.try_advance_commit_index();
132 }
133 } else {
134 let new_next = resp.last_log_index + 1;
135 let current_next = leader.next_index_for(peer);
136 if new_next < current_next {
137 leader.set_next_index(peer, new_next.max(1));
138 } else {
139 leader.set_next_index(peer, current_next.saturating_sub(1).max(1));
140 }
141 self.send_append_entries(peer);
142 }
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::time::{Duration, Instant};
149
150 use crate::message::{
151 AppendEntriesRequest, AppendEntriesResponse, LogEntry, RequestVoteResponse,
152 };
153 use crate::node::config::RaftConfig;
154 use crate::node::core::RaftNode;
155 use crate::node::rpc::test_helpers::{setup_leader_with_observer, test_config};
156 use crate::state::NodeRole;
157 use crate::storage::MemStorage;
158
159 #[test]
160 fn follower_rejects_old_term() {
161 let config = test_config(1, vec![2, 3]);
162 let mut node = RaftNode::new(config, MemStorage::new());
163 node.hard_state.current_term = 5;
164
165 let req = AppendEntriesRequest {
166 term: 3,
167 leader_id: 2,
168 prev_log_index: 0,
169 prev_log_term: 0,
170 entries: vec![],
171 leader_commit: 0,
172 group_id: 1,
173 };
174
175 let resp = node.handle_append_entries(&req);
176 assert!(!resp.success);
177 assert_eq!(resp.term, 5);
178 }
179
180 #[test]
181 fn follower_accepts_valid_append() {
182 let config = test_config(1, vec![2, 3]);
183 let mut node = RaftNode::new(config, MemStorage::new());
184
185 let req = AppendEntriesRequest {
186 term: 1,
187 leader_id: 2,
188 prev_log_index: 0,
189 prev_log_term: 0,
190 entries: vec![
191 LogEntry {
192 term: 1,
193 index: 1,
194 data: b"a".to_vec(),
195 },
196 LogEntry {
197 term: 1,
198 index: 2,
199 data: b"b".to_vec(),
200 },
201 ],
202 leader_commit: 1,
203 group_id: 1,
204 };
205
206 let resp = node.handle_append_entries(&req);
207 assert!(resp.success);
208 assert_eq!(resp.last_log_index, 2);
209 assert_eq!(node.commit_index(), 1);
210 assert_eq!(node.leader_id(), 2);
211 }
212
213 #[test]
214 fn learner_accepts_append_entries_and_stays_learner() {
215 let mut config = test_config(2, vec![1]);
216 config.starts_as_learner = true;
217 let mut node = RaftNode::new(config, MemStorage::new());
218
219 let req = AppendEntriesRequest {
220 term: 1,
221 leader_id: 1,
222 prev_log_index: 0,
223 prev_log_term: 0,
224 entries: vec![LogEntry {
225 term: 1,
226 index: 1,
227 data: b"x".to_vec(),
228 }],
229 leader_commit: 1,
230 group_id: 1,
231 };
232
233 let resp = node.handle_append_entries(&req);
234 assert!(resp.success);
235 assert_eq!(node.commit_index(), 1);
236 assert_eq!(node.role(), NodeRole::Learner);
238 assert_eq!(node.leader_id(), 1);
239 }
240
241 #[test]
242 fn leader_steps_down_on_higher_term() {
243 let config = test_config(1, vec![2, 3]);
244 let mut node = RaftNode::new(config, MemStorage::new());
245
246 node.election_deadline = Instant::now() - Duration::from_millis(1);
247 node.tick();
248 let _ready = node.take_ready();
249 let resp = RequestVoteResponse {
250 term: 1,
251 vote_granted: true,
252 };
253 node.handle_request_vote_response(2, &resp);
254 assert_eq!(node.role(), NodeRole::Leader);
255
256 let req = AppendEntriesRequest {
257 term: 5,
258 leader_id: 2,
259 prev_log_index: 0,
260 prev_log_term: 0,
261 entries: vec![],
262 leader_commit: 0,
263 group_id: 1,
264 };
265 node.handle_append_entries(&req);
266 assert_eq!(node.role(), NodeRole::Follower);
267 assert_eq!(node.current_term(), 5);
268 assert_eq!(node.leader_id(), 2);
269 }
270
271 #[test]
275 fn learner_ae_response_does_not_drive_commit() {
276 let mut config = test_config(1, vec![2, 3]);
279 config.learners = vec![4];
280 let mut node = RaftNode::new(config, MemStorage::new());
281
282 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
284 node.tick();
285 let yes = RequestVoteResponse {
287 term: 1,
288 vote_granted: true,
289 };
290 node.handle_request_vote_response(2, &yes);
291 assert_eq!(node.role(), NodeRole::Leader);
292 let _ = node.take_ready();
293
294 let idx = node.propose(b"cmd".to_vec()).unwrap();
296 assert_eq!(idx, 2);
297 let _ = node.take_ready();
298
299 let baseline_commit = node.commit_index();
301 assert!(baseline_commit < 2);
302
303 let ae_ok = AppendEntriesResponse {
305 term: 1,
306 success: true,
307 last_log_index: 2,
308 };
309 node.handle_append_entries_response(4, &ae_ok);
310 assert_eq!(
311 node.commit_index(),
312 baseline_commit,
313 "learner ACK must not contribute to commit quorum"
314 );
315
316 node.handle_append_entries_response(2, &ae_ok);
318 assert_eq!(node.commit_index(), 2);
319 }
320
321 #[test]
322 fn three_node_replication() {
323 let config1 = test_config(1, vec![2, 3]);
324 let config2 = test_config(2, vec![1, 3]);
325
326 let mut node1 = RaftNode::new(config1, MemStorage::new());
327 let mut node2 = RaftNode::new(config2, MemStorage::new());
328
329 node1.election_deadline = Instant::now() - Duration::from_millis(1);
330 node1.tick();
331 let ready = node1.take_ready();
332 let resp2 = node2.handle_request_vote(&ready.vote_requests[0].1);
333 node1.handle_request_vote_response(2, &resp2);
334 assert_eq!(node1.role(), NodeRole::Leader);
335
336 let heartbeat_ready = node1.take_ready();
337 for (peer_id, msg) in &heartbeat_ready.messages {
338 if *peer_id == 2 {
339 let resp = node2.handle_append_entries(msg);
340 node1.handle_append_entries_response(2, &resp);
341 }
342 }
343
344 let idx = node1.propose(b"cmd1".to_vec()).unwrap();
345 assert_eq!(idx, 2);
346
347 let ready = node1.take_ready();
348 for (peer_id, msg) in &ready.messages {
349 if *peer_id == 2 {
350 let resp = node2.handle_append_entries(msg);
351 assert!(resp.success);
352 node1.handle_append_entries_response(2, &resp);
353 }
354 }
355
356 let ready = node1.take_ready();
357 let committed: Vec<_> = ready
358 .committed_entries
359 .iter()
360 .filter(|e| !e.data.is_empty())
361 .collect();
362 assert_eq!(committed.len(), 1);
363 assert_eq!(committed[0].data, b"cmd1");
364 }
365
366 #[test]
369 fn observer_receives_entries_but_does_not_contribute_to_quorum() {
370 let (mut leader, mut obs) = setup_leader_with_observer();
371
372 let idx = leader.propose(b"x".to_vec()).unwrap();
374 assert_eq!(idx, 2);
375 let ready = leader.take_ready();
376
377 let baseline_commit = leader.commit_index();
378 assert!(
379 baseline_commit < 2,
380 "commit should not advance without voter ACK"
381 );
382
383 let obs_msg = ready
385 .messages
386 .iter()
387 .find(|(id, _)| *id == 5)
388 .map(|(_, m)| m.clone());
389 let obs_msg = obs_msg.expect("leader must send to observer");
390 let obs_resp = obs.handle_append_entries(&obs_msg);
391 assert!(obs_resp.success);
392 assert_eq!(
393 obs.role(),
394 NodeRole::Observer,
395 "observer must stay Observer"
396 );
397
398 leader.handle_append_entries_response(5, &obs_resp);
400 assert_eq!(
401 leader.commit_index(),
402 baseline_commit,
403 "observer ack must not contribute to commit quorum"
404 );
405
406 let ae_ok = AppendEntriesResponse {
408 term: 1,
409 success: true,
410 last_log_index: idx,
411 };
412 leader.handle_append_entries_response(2, &ae_ok);
413 assert_eq!(leader.commit_index(), idx);
414 }
415
416 #[test]
419 fn observer_does_not_restore_lost_quorum() {
420 let mut node1 = RaftNode::new(
422 RaftConfig {
423 node_id: 1,
424 group_id: 1,
425 peers: vec![2, 3],
426 learners: vec![],
427 observers: vec![5],
428 starts_as_learner: false,
429 starts_as_observer: false,
430 election_timeout_min: Duration::from_millis(150),
431 election_timeout_max: Duration::from_millis(300),
432 heartbeat_interval: Duration::from_millis(50),
433 },
434 MemStorage::new(),
435 );
436 node1.election_deadline = Instant::now() - Duration::from_millis(1);
438 node1.tick();
439 let _ = node1.take_ready();
440 for v in [2u64, 3] {
441 node1.handle_request_vote_response(
442 v,
443 &RequestVoteResponse {
444 term: 1,
445 vote_granted: true,
446 },
447 );
448 }
449 assert_eq!(node1.role(), NodeRole::Leader);
450 let _ = node1.take_ready();
451
452 let idx = node1.propose(b"cmd".to_vec()).unwrap();
454 let _ = node1.take_ready();
455 let pre_commit = node1.commit_index();
456 assert!(pre_commit < idx);
457
458 let obs_ack = AppendEntriesResponse {
460 term: 1,
461 success: true,
462 last_log_index: idx,
463 };
464 node1.handle_append_entries_response(5, &obs_ack);
465 assert_eq!(
466 node1.commit_index(),
467 pre_commit,
468 "quorum is lost (2 voters dead); observer ack must not restore it"
469 );
470 }
471
472 #[test]
475 fn observer_crash_does_not_stall_source() {
476 let (mut leader, _obs) = setup_leader_with_observer();
477
478 let idx = leader.propose(b"y".to_vec()).unwrap();
479 assert_eq!(idx, 2);
480 let ready = leader.take_ready();
481
482 let voter_ack = AppendEntriesResponse {
484 term: 1,
485 success: true,
486 last_log_index: idx,
487 };
488 leader.handle_append_entries_response(2, &voter_ack);
489 assert_eq!(
490 leader.commit_index(),
491 idx,
492 "source must commit without observer ack (observer crash)"
493 );
494 let _ = ready;
495 }
496}