1use crate::error::{RaftError, RaftResult};
4use crate::log::{Command, LogEntry, RaftLog};
5use crate::persistence::{FilePersistence, RaftPersistence};
6use crate::rpc::{
7 AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
8};
9use crate::snapshot::{
10 InstallSnapshotRequest, InstallSnapshotResponse, Snapshot, SnapshotConfig, SnapshotManager,
11 SnapshotPolicy, SnapshotReceiver,
12};
13use crate::state::FencingTokenState;
14use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
15use crate::types::{
16 ClusterConfig, ConfigState, FencingToken, LogIndex, MembershipChange, NodeId, NodeState,
17 RaftConfig, Term,
18};
19use crate::wal::{CorruptionPolicy, WalReader};
20use parking_lot::RwLock;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::{Duration, Instant};
25use tracing::{debug, info, warn};
26
27pub struct RaftNode {
29 config: Arc<RaftConfig>,
31 persistent: Arc<RwLock<PersistentState>>,
33 volatile: Arc<RwLock<VolatileState>>,
35 log: Arc<RwLock<RaftLog>>,
37 leader_state: Arc<RwLock<Option<LeaderState>>>,
39 candidate_state: Arc<RwLock<Option<CandidateState>>>,
41 last_heartbeat: Arc<RwLock<Instant>>,
43 snapshot_manager: Arc<RwLock<Option<SnapshotManager>>>,
45 snapshot_receiver: Arc<RwLock<Option<SnapshotReceiver>>>,
47 persistence: Option<Arc<dyn RaftPersistence>>,
49 config_state: Arc<RwLock<ConfigState>>,
51 stepping_down: Arc<RwLock<bool>>,
53 fencing_token_state: Arc<FencingTokenState>,
55 is_recovering: Arc<AtomicBool>,
57}
58
59impl RaftNode {
60 pub fn new(config: RaftConfig) -> RaftResult<Self> {
62 config
64 .validate()
65 .map_err(|msg| RaftError::ConfigError { message: msg })?;
66
67 let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
69 let snap_config =
70 SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
71 Some(SnapshotManager::new(snap_config)?)
72 } else {
73 None
74 };
75
76 let persistence: Option<Arc<dyn RaftPersistence>> =
78 if let Some(ref dir) = config.persistence_dir {
79 Some(Arc::new(FilePersistence::new(dir, config.sync_on_write)?))
80 } else {
81 None
82 };
83
84 let (persistent_state, mut raft_log) = if let Some(ref p) = persistence {
86 let (term, voted_for) = p.load_state()?;
87 let mut ps = PersistentState::new();
88 ps.current_term = term;
89 ps.voted_for = voted_for;
90
91 let entries = p.load_log()?;
92 let mut log = RaftLog::new();
93 if !entries.is_empty() {
94 log.append_entries(entries)?;
95 }
96
97 let applied_idx = p.load_applied_index()?;
99 if applied_idx > 0 && applied_idx <= log.last_index() {
100 if let Err(e) = log.set_commit_index(applied_idx) {
102 warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
103 } else if let Err(e) = log.set_applied_index(applied_idx) {
104 warn!(applied_idx, error = ?e, "Failed to restore applied index");
105 }
106 }
107
108 info!(
109 node_id = config.node_id,
110 term = term,
111 voted_for = ?voted_for,
112 last_log_index = log.last_index(),
113 "Recovered state from persistence"
114 );
115
116 (ps, log)
117 } else {
118 (PersistentState::new(), RaftLog::new())
119 };
120
121 let is_recovering = Arc::new(AtomicBool::new(false));
125 if let Some(ref wal_dir) = config.wal_dir {
126 is_recovering.store(true, Ordering::Release);
127 let result = replay_wal_into_log(wal_dir, &mut raft_log);
128 is_recovering.store(false, Ordering::Release);
129 result?;
130 }
131
132 let initial_members: Vec<(NodeId, String)> =
134 config.peers.iter().map(|&id| (id, String::new())).collect();
135 let config_state = ConfigState::new_stable(initial_members);
136
137 Ok(Self {
138 config: Arc::new(config),
139 persistent: Arc::new(RwLock::new(persistent_state)),
140 volatile: Arc::new(RwLock::new(VolatileState::new())),
141 log: Arc::new(RwLock::new(raft_log)),
142 leader_state: Arc::new(RwLock::new(None)),
143 candidate_state: Arc::new(RwLock::new(None)),
144 last_heartbeat: Arc::new(RwLock::new(Instant::now())),
145 snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
146 snapshot_receiver: Arc::new(RwLock::new(None)),
147 persistence,
148 config_state: Arc::new(RwLock::new(config_state)),
149 stepping_down: Arc::new(RwLock::new(false)),
150 fencing_token_state: Arc::new(FencingTokenState::new()),
151 is_recovering,
152 })
153 }
154
155 pub fn with_persistence(
160 config: RaftConfig,
161 persistence: Arc<dyn RaftPersistence>,
162 ) -> RaftResult<Self> {
163 config
164 .validate()
165 .map_err(|msg| RaftError::ConfigError { message: msg })?;
166
167 let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
168 let snap_config =
169 SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
170 Some(SnapshotManager::new(snap_config)?)
171 } else {
172 None
173 };
174
175 let (term, voted_for) = persistence.load_state()?;
176 let mut ps = PersistentState::new();
177 ps.current_term = term;
178 ps.voted_for = voted_for;
179
180 let entries = persistence.load_log()?;
181 let mut raft_log = RaftLog::new();
182 if !entries.is_empty() {
183 raft_log.append_entries(entries)?;
184 }
185
186 let applied_idx = persistence.load_applied_index()?;
188 if applied_idx > 0 && applied_idx <= raft_log.last_index() {
189 if let Err(e) = raft_log.set_commit_index(applied_idx) {
190 warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
191 } else if let Err(e) = raft_log.set_applied_index(applied_idx) {
192 warn!(applied_idx, error = ?e, "Failed to restore applied index");
193 }
194 }
195
196 info!(
197 node_id = config.node_id,
198 term = term,
199 voted_for = ?voted_for,
200 last_log_index = raft_log.last_index(),
201 "Recovered state via explicit persistence"
202 );
203
204 let is_recovering = Arc::new(AtomicBool::new(false));
206 if let Some(ref wal_dir) = config.wal_dir {
207 is_recovering.store(true, Ordering::Release);
208 let result = replay_wal_into_log(wal_dir, &mut raft_log);
209 is_recovering.store(false, Ordering::Release);
210 result?;
211 }
212
213 let initial_members: Vec<(NodeId, String)> =
214 config.peers.iter().map(|&id| (id, String::new())).collect();
215 let config_state = ConfigState::new_stable(initial_members);
216
217 Ok(Self {
218 config: Arc::new(config),
219 persistent: Arc::new(RwLock::new(ps)),
220 volatile: Arc::new(RwLock::new(VolatileState::new())),
221 log: Arc::new(RwLock::new(raft_log)),
222 leader_state: Arc::new(RwLock::new(None)),
223 candidate_state: Arc::new(RwLock::new(None)),
224 last_heartbeat: Arc::new(RwLock::new(Instant::now())),
225 snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
226 snapshot_receiver: Arc::new(RwLock::new(None)),
227 persistence: Some(persistence),
228 config_state: Arc::new(RwLock::new(config_state)),
229 stepping_down: Arc::new(RwLock::new(false)),
230 fencing_token_state: Arc::new(FencingTokenState::new()),
231 is_recovering,
232 })
233 }
234
235 fn persist_state(&self, term: Term, voted_for: Option<NodeId>) {
238 if let Some(ref p) = self.persistence {
239 if let Err(e) = p.save_state(term, voted_for) {
240 warn!(node_id = self.node_id(), error = ?e, "Failed to persist state");
241 }
242 }
243 }
244
245 fn persist_log_entries(&self, entries: &[LogEntry]) {
247 if let Some(ref p) = self.persistence {
248 if let Err(e) = p.append_entries(entries) {
249 warn!(node_id = self.node_id(), error = ?e, "Failed to persist log entries");
250 }
251 }
252 }
253
254 fn persist_log_truncation(&self, from_index: LogIndex) {
256 if let Some(ref p) = self.persistence {
257 if let Err(e) = p.truncate_log_from(from_index) {
258 warn!(node_id = self.node_id(), error = ?e, "Failed to persist log truncation");
259 }
260 }
261 }
262
263 pub fn node_id(&self) -> NodeId {
265 self.config.node_id
266 }
267
268 pub fn current_term(&self) -> Term {
270 self.persistent.read().current_term
271 }
272
273 pub fn state(&self) -> NodeState {
275 self.volatile.read().node_state
276 }
277
278 pub fn leader_id(&self) -> Option<NodeId> {
280 self.volatile.read().leader_id
281 }
282
283 pub fn is_leader(&self) -> bool {
285 self.volatile.read().is_leader()
286 }
287
288 pub fn commit_index(&self) -> LogIndex {
290 self.log.read().commit_index()
291 }
292
293 pub fn last_log_index(&self) -> LogIndex {
295 self.log.read().last_index()
296 }
297
298 pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
300 let volatile = self.volatile.read();
301 if !volatile.is_leader() {
302 return Err(RaftError::NotLeader {
303 leader_id: volatile.leader_id,
304 });
305 }
306 drop(volatile);
307
308 let term = self.current_term();
309 let mut log = self.log.write();
310 let index = log.append(term, command.clone());
311
312 let entry = LogEntry::new(term, index, command);
314 self.persist_log_entries(&[entry]);
315
316 info!(
317 node_id = self.node_id(),
318 index = index,
319 term = term,
320 "Proposed new entry"
321 );
322
323 Ok(index)
324 }
325
326 pub fn is_recovering(&self) -> bool {
328 self.is_recovering.load(Ordering::Acquire)
329 }
330
331 pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
333 if self.is_recovering.load(Ordering::Acquire) {
335 warn!(
336 node_id = self.node_id(),
337 candidate = req.candidate_id,
338 event = "rpc_rejected_recovering",
339 "Rejecting RequestVote: node is recovering from WAL"
340 );
341 return RequestVoteResponse::rejected(self.current_term());
342 }
343
344 let mut persistent = self.persistent.write();
345 let mut volatile = self.volatile.write();
346
347 debug!(
348 node_id = self.node_id(),
349 candidate = req.candidate_id,
350 term = req.term,
351 "Received RequestVote"
352 );
353
354 if req.term > persistent.current_term {
356 let from_term = persistent.current_term;
357 persistent.update_term(req.term);
358 self.persist_state(persistent.current_term, persistent.voted_for);
359 volatile.become_follower(None);
360 *self.leader_state.write() = None;
361 *self.candidate_state.write() = None;
362 debug!(
363 node_id = self.node_id(),
364 from_term = from_term,
365 to_term = persistent.current_term,
366 "Stepped down to follower (higher term in RequestVote)"
367 );
368 }
369
370 if req.term < persistent.current_term {
372 warn!(
373 node_id = self.node_id(),
374 candidate = req.candidate_id,
375 current_term = persistent.current_term,
376 request_term = req.term,
377 "Rejecting vote: stale term"
378 );
379 return RequestVoteResponse::rejected(persistent.current_term);
380 }
381
382 if let Some(voted_for) = persistent.voted_for {
384 if voted_for != req.candidate_id {
385 warn!(
386 node_id = self.node_id(),
387 candidate = req.candidate_id,
388 voted_for = voted_for,
389 "Rejecting vote: already voted"
390 );
391 return RequestVoteResponse::rejected(persistent.current_term);
392 }
393 }
394
395 let log = self.log.read();
397 let our_last_index = log.last_index();
398 let our_last_term = log.last_term();
399
400 let log_ok = req.last_log_term > our_last_term
401 || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
402
403 if !log_ok {
404 warn!(
405 node_id = self.node_id(),
406 candidate = req.candidate_id,
407 our_last_index = our_last_index,
408 our_last_term = our_last_term,
409 candidate_last_index = req.last_log_index,
410 candidate_last_term = req.last_log_term,
411 "Rejecting vote: candidate log not up-to-date"
412 );
413 return RequestVoteResponse::rejected(persistent.current_term);
414 }
415
416 persistent.grant_vote(req.candidate_id);
418 self.persist_state(persistent.current_term, persistent.voted_for);
419 *self.last_heartbeat.write() = Instant::now();
420
421 info!(
422 node_id = self.node_id(),
423 candidate = req.candidate_id,
424 term = req.term,
425 "Granted vote"
426 );
427
428 RequestVoteResponse::granted(persistent.current_term)
429 }
430
431 pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
433 if self.is_recovering.load(Ordering::Acquire) {
435 warn!(
436 node_id = self.node_id(),
437 leader = req.leader_id,
438 event = "rpc_rejected_recovering",
439 "Rejecting AppendEntries: node is recovering from WAL"
440 );
441 return AppendEntriesResponse::rejected(self.current_term());
442 }
443
444 let mut persistent = self.persistent.write();
445 let mut volatile = self.volatile.write();
446
447 debug!(
448 node_id = self.node_id(),
449 leader = req.leader_id,
450 term = req.term,
451 entries = req.entries.len(),
452 "Received AppendEntries"
453 );
454
455 if req.term > persistent.current_term {
457 let from_term = persistent.current_term;
458 persistent.update_term(req.term);
459 self.persist_state(persistent.current_term, persistent.voted_for);
460 volatile.become_follower(Some(req.leader_id));
461 *self.leader_state.write() = None;
462 *self.candidate_state.write() = None;
463 debug!(
464 node_id = self.node_id(),
465 from_term = from_term,
466 to_term = persistent.current_term,
467 leader_id = req.leader_id,
468 "Stepped down to follower (higher term in AppendEntries)"
469 );
470 }
471
472 if req.term < persistent.current_term {
474 warn!(
475 node_id = self.node_id(),
476 leader = req.leader_id,
477 current_term = persistent.current_term,
478 request_term = req.term,
479 "Rejecting AppendEntries: stale term"
480 );
481 return AppendEntriesResponse::rejected(persistent.current_term);
482 }
483
484 *self.last_heartbeat.write() = Instant::now();
486 volatile.become_follower(Some(req.leader_id));
487 *self.candidate_state.write() = None;
488
489 drop(persistent);
490 drop(volatile);
491
492 let mut log = self.log.write();
494 let our_last_index = log.last_index();
495
496 if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
498 let conflict_index = req.prev_log_index.min(our_last_index);
500 let conflict_term = log.get_term(conflict_index).unwrap_or(0);
501
502 warn!(
503 node_id = self.node_id(),
504 prev_log_index = req.prev_log_index,
505 prev_log_term = req.prev_log_term,
506 conflict_index = conflict_index,
507 conflict_term = conflict_term,
508 "Rejecting AppendEntries: log inconsistency"
509 );
510
511 return AppendEntriesResponse::failure(
512 self.current_term(),
513 our_last_index,
514 conflict_index,
515 conflict_term,
516 );
517 }
518
519 if !req.entries.is_empty() {
521 let first_new_index = req.entries[0].index;
523 if first_new_index <= our_last_index {
524 if let Err(e) = log.truncate_from(first_new_index) {
525 warn!(
526 node_id = self.node_id(),
527 error = ?e,
528 "Failed to truncate log"
529 );
530 return AppendEntriesResponse::rejected(self.current_term());
531 }
532 self.persist_log_truncation(first_new_index);
533 }
534
535 self.persist_log_entries(&req.entries);
537
538 if let Err(e) = log.append_entries(req.entries) {
540 warn!(
541 node_id = self.node_id(),
542 error = ?e,
543 "Failed to append entries"
544 );
545 return AppendEntriesResponse::rejected(self.current_term());
546 }
547 }
548
549 if req.leader_commit > log.commit_index() {
551 let old_commit = log.commit_index();
552 let new_commit = req.leader_commit.min(log.last_index());
553 if let Err(e) = log.set_commit_index(new_commit) {
554 warn!(
555 node_id = self.node_id(),
556 error = ?e,
557 "Failed to update commit index"
558 );
559 } else {
560 debug!(
561 node_id = self.node_id(),
562 old_commit_index = old_commit,
563 new_commit_index = new_commit,
564 "Updated commit index"
565 );
566 }
567 }
568
569 AppendEntriesResponse::success(self.current_term(), log.last_index())
570 }
571
572 pub fn start_election(&self) -> Vec<RequestVoteRequest> {
574 let mut persistent = self.persistent.write();
575 let mut volatile = self.volatile.write();
576
577 persistent.current_term += 1;
579 persistent.grant_vote(self.node_id());
580
581 self.persist_state(persistent.current_term, persistent.voted_for);
583
584 volatile.become_candidate();
586
587 *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
589
590 let term = persistent.current_term;
591 let log = self.log.read();
592 let last_log_index = log.last_index();
593 let last_log_term = log.last_term();
594
595 let _span =
596 tracing::info_span!("raft_election", node_id = self.node_id(), term = term).entered();
597
598 info!(
599 node_id = self.node_id(),
600 candidate_term = term,
601 log_length = last_log_index,
602 "Started election"
603 );
604
605 self.config
607 .peers
608 .iter()
609 .filter(|&&peer| peer != self.node_id())
610 .map(|&peer| {
611 RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
612 })
613 .collect()
614 }
615
616 pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
618 let should_become_leader = {
619 let mut persistent = self.persistent.write();
620 let mut volatile = self.volatile.write();
621
622 if !volatile.is_candidate() {
624 return false;
625 }
626
627 if resp.term > persistent.current_term {
629 let from_term = persistent.current_term;
630 persistent.update_term(resp.term);
631 self.persist_state(persistent.current_term, persistent.voted_for);
632 volatile.become_follower(None);
633 *self.candidate_state.write() = None;
634 debug!(
635 node_id = self.node_id(),
636 from_term = from_term,
637 to_term = persistent.current_term,
638 "Stepped down to follower (higher term in vote response)"
639 );
640 return false;
641 }
642
643 if resp.term < persistent.current_term {
645 return false;
646 }
647
648 if resp.vote_granted {
650 let mut candidate_state_guard = self.candidate_state.write();
651 if let Some(candidate_state) = candidate_state_guard.as_mut() {
652 candidate_state.record_vote(from);
653
654 info!(
655 node_id = self.node_id(),
656 from = from,
657 votes = candidate_state.vote_count(),
658 quorum = self.config.quorum_size(),
659 "Received vote"
660 );
661
662 candidate_state.has_quorum(self.config.quorum_size())
664 } else {
665 false
666 }
667 } else {
668 false
669 }
670 };
671
672 if should_become_leader {
674 let votes = self
675 .candidate_state
676 .read()
677 .as_ref()
678 .map(|cs| cs.vote_count())
679 .unwrap_or(0);
680 info!(
681 node_id = self.node_id(),
682 term = self.current_term(),
683 votes_received = votes,
684 "Won election with quorum"
685 );
686 self.become_leader();
687 return true;
688 }
689
690 false
691 }
692
693 fn become_leader(&self) {
695 let mut volatile = self.volatile.write();
696 volatile.become_leader();
697
698 let log = self.log.read();
699 let last_log_index = log.last_index();
700
701 *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
703 *self.candidate_state.write() = None;
704
705 let persistent = self.persistent.read();
706 let term = persistent.current_term;
707
708 self.fencing_token_state.bump_term_token(term as u32);
710
711 info!(
712 node_id = self.node_id(),
713 term,
714 voted_for = ?persistent.voted_for,
715 peer_count = self.config.peers.len(),
716 "Became leader"
717 );
718 }
719
720 pub fn issue_fencing_token(&self) -> Option<FencingToken> {
724 if !self.volatile.read().is_leader() {
725 return None;
726 }
727 Some(self.fencing_token_state.issue_token())
728 }
729
730 pub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()> {
735 let current_term = self.current_term();
736 if token.term() as u64 == current_term {
737 Ok(())
738 } else {
739 Err(RaftError::StaleTerm {
740 current: current_term,
741 received: token.term() as u64,
742 })
743 }
744 }
745
746 pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
748 let volatile = self.volatile.read();
749 if !volatile.is_leader() {
750 return Vec::new();
751 }
752 drop(volatile);
753
754 let term = self.current_term();
755 let log = self.log.read();
756 let leader_commit = log.commit_index();
757
758 self.config
759 .peers
760 .iter()
761 .filter(|&&peer| peer != self.node_id())
762 .map(|&peer| {
763 let prev_log_index = log.last_index();
764 let prev_log_term = log.last_term();
765
766 let req = AppendEntriesRequest::heartbeat(
767 term,
768 self.node_id(),
769 prev_log_index,
770 prev_log_term,
771 leader_commit,
772 );
773
774 (peer, req)
775 })
776 .collect()
777 }
778
779 pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
781 let volatile = self.volatile.read();
782 if !volatile.is_leader() {
783 return Vec::new();
784 }
785 drop(volatile);
786
787 let leader_state_guard = self.leader_state.read();
788 let leader_state = match leader_state_guard.as_ref() {
789 Some(state) => state,
790 None => return Vec::new(),
791 };
792
793 let term = self.current_term();
794 let log = self.log.read();
795 let leader_commit = log.commit_index();
796
797 self.config
798 .peers
799 .iter()
800 .filter(|&&peer| peer != self.node_id())
801 .filter_map(|&peer| {
802 let next_index = leader_state.get_next_index(peer);
803
804 if next_index > log.last_index() {
805 return None;
806 }
807
808 let prev_log_index = next_index.saturating_sub(1);
809 let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
810
811 let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
812
813 if entries.is_empty() {
814 return None;
815 }
816
817 let req = AppendEntriesRequest::new(
818 term,
819 self.node_id(),
820 prev_log_index,
821 prev_log_term,
822 entries,
823 leader_commit,
824 );
825
826 Some((peer, req))
827 })
828 .collect()
829 }
830
831 pub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
848 self.create_replication_requests()
849 }
850
851 pub fn create_replication_request_for(&self, peer: NodeId) -> Option<AppendEntriesRequest> {
856 let volatile = self.volatile.read();
857 if !volatile.is_leader() {
858 return None;
859 }
860 drop(volatile);
861
862 let leader_state_guard = self.leader_state.read();
863 let leader_state = leader_state_guard.as_ref()?;
864
865 let term = self.current_term();
866 let log = self.log.read();
867 let leader_commit = log.commit_index();
868
869 let next_index = leader_state.get_next_index(peer);
870
871 if next_index > log.last_index() {
872 return None;
874 }
875
876 let prev_log_index = next_index.saturating_sub(1);
877 let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
878
879 let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
880
881 if entries.is_empty() {
882 return None;
883 }
884
885 Some(AppendEntriesRequest::new(
886 term,
887 self.node_id(),
888 prev_log_index,
889 prev_log_term,
890 entries,
891 leader_commit,
892 ))
893 }
894
895 pub fn handle_replication_response(
897 &self,
898 from: NodeId,
899 resp: AppendEntriesResponse,
900 ) -> RaftResult<()> {
901 let mut persistent = self.persistent.write();
902 let mut volatile = self.volatile.write();
903
904 if !volatile.is_leader() {
906 return Ok(());
907 }
908
909 if resp.term > persistent.current_term {
911 let from_term = persistent.current_term;
912 persistent.update_term(resp.term);
913 self.persist_state(persistent.current_term, persistent.voted_for);
914 volatile.become_follower(None);
915 *self.leader_state.write() = None;
916 info!(
917 node_id = self.node_id(),
918 from_term = from_term,
919 to_term = persistent.current_term,
920 "Stepped down: leader to follower (higher term in replication response)"
921 );
922 return Ok(());
923 }
924
925 drop(persistent);
926 drop(volatile);
927
928 let mut leader_state_guard = self.leader_state.write();
929 let leader_state = match leader_state_guard.as_mut() {
930 Some(state) => state,
931 None => return Ok(()),
932 };
933
934 if resp.success {
935 leader_state.update_success(from, resp.last_log_index);
937
938 debug!(
939 node_id = self.node_id(),
940 peer = from,
941 match_index = resp.last_log_index,
942 "Replication successful"
943 );
944
945 let config_state = self.config_state.read().clone();
948 let new_commit = leader_state.calculate_commit_index_joint(
949 self.node_id(),
950 self.log.read().last_index(),
951 &config_state,
952 );
953
954 let mut log = self.log.write();
955 if new_commit > log.commit_index() {
956 if let Some(term) = log.get_term(new_commit) {
958 if term == self.current_term() {
959 let old_commit = log.commit_index();
960 log.set_commit_index(new_commit)?;
961 info!(
962 node_id = self.node_id(),
963 old_commit_index = old_commit,
964 new_commit_index = new_commit,
965 "Advanced commit index"
966 );
967 }
968 }
969 }
970 } else {
971 if resp.conflict_index.is_some() || resp.conflict_term.is_some() {
974 leader_state.update_failure_with_hint(
975 from,
976 resp.conflict_index,
977 resp.conflict_term,
978 resp.last_log_index,
979 );
980 } else {
981 leader_state.update_failure(from);
982 }
983
984 warn!(
985 node_id = self.node_id(),
986 peer = from,
987 next_index = leader_state.get_next_index(from),
988 conflict_index = ?resp.conflict_index,
989 conflict_term = ?resp.conflict_term,
990 "Replication failed, will retry with adjusted next_index"
991 );
992 }
993
994 Ok(())
995 }
996
997 pub fn maybe_create_snapshot(&self, state_machine_data: Vec<u8>) -> RaftResult<bool> {
1005 let mut snap_guard = self.snapshot_manager.write();
1006 let manager = match snap_guard.as_mut() {
1007 Some(m) => m,
1008 None => return Ok(false),
1009 };
1010
1011 let log = self.log.read();
1012 let entries_since = log.entries_since_snapshot();
1013
1014 if !manager.should_snapshot(entries_since) {
1015 return Ok(false);
1016 }
1017
1018 let applied_index = log.applied_index();
1019 if applied_index == 0 {
1020 return Ok(false);
1021 }
1022
1023 let applied_term = match log.get_term(applied_index) {
1024 Some(t) => t,
1025 None => {
1026 let (snap_idx, snap_term) = log.get_snapshot_point();
1028 if applied_index == snap_idx {
1029 snap_term
1030 } else {
1031 return Err(RaftError::LogInconsistency {
1032 reason: format!(
1033 "Cannot determine term for applied index {}",
1034 applied_index
1035 ),
1036 });
1037 }
1038 }
1039 };
1040
1041 drop(log);
1042
1043 manager.create_snapshot(state_machine_data, applied_index, applied_term)?;
1044
1045 let mut log = self.log.write();
1047 log.compact_until(applied_index, applied_term)?;
1048
1049 info!(
1050 node_id = self.node_id(),
1051 snapshot_index = applied_index,
1052 snapshot_term = applied_term,
1053 "Created snapshot and compacted log"
1054 );
1055
1056 Ok(true)
1057 }
1058
1059 pub fn auto_snapshot_if_needed<F>(
1067 &self,
1068 policy: &SnapshotPolicy,
1069 state_machine_data_fn: F,
1070 ) -> RaftResult<bool>
1071 where
1072 F: FnOnce() -> RaftResult<Vec<u8>>,
1073 {
1074 let log = self.log.read();
1075 let entries_since = log.entries_since_snapshot();
1076 let applied_index = log.applied_index();
1077
1078 if !policy.should_snapshot(entries_since, applied_index) {
1079 return Ok(false);
1080 }
1081
1082 if applied_index == 0 {
1083 return Ok(false);
1084 }
1085
1086 let applied_term = match log.get_term(applied_index) {
1087 Some(t) => t,
1088 None => {
1089 let (snap_idx, snap_term) = log.get_snapshot_point();
1090 if applied_index == snap_idx {
1091 snap_term
1092 } else {
1093 return Err(RaftError::LogInconsistency {
1094 reason: format!(
1095 "Cannot determine term for applied index {}",
1096 applied_index
1097 ),
1098 });
1099 }
1100 }
1101 };
1102
1103 drop(log);
1104
1105 let data = state_machine_data_fn()?;
1106
1107 let mut snap_guard = self.snapshot_manager.write();
1108 let manager = match snap_guard.as_mut() {
1109 Some(m) => m,
1110 None => return Ok(false),
1111 };
1112
1113 manager.create_snapshot(data, applied_index, applied_term)?;
1114 drop(snap_guard);
1115
1116 let mut log = self.log.write();
1117 log.compact_until(applied_index, applied_term)?;
1118
1119 info!(
1120 node_id = self.node_id(),
1121 snapshot_index = applied_index,
1122 snapshot_term = applied_term,
1123 entries_compacted = entries_since,
1124 "Auto-snapshot triggered and log compacted"
1125 );
1126
1127 Ok(true)
1128 }
1129
1130 pub fn handle_install_snapshot(
1135 &self,
1136 req: InstallSnapshotRequest,
1137 ) -> RaftResult<InstallSnapshotResponse> {
1138 let mut persistent = self.persistent.write();
1139 let mut volatile = self.volatile.write();
1140
1141 debug!(
1142 node_id = self.node_id(),
1143 leader = req.leader_id,
1144 term = req.term,
1145 last_included_index = req.last_included_index,
1146 last_included_term = req.last_included_term,
1147 offset = req.offset,
1148 done = req.done,
1149 "Received InstallSnapshot"
1150 );
1151
1152 if req.term > persistent.current_term {
1154 let from_term = persistent.current_term;
1155 persistent.update_term(req.term);
1156 self.persist_state(persistent.current_term, persistent.voted_for);
1157 volatile.become_follower(Some(req.leader_id));
1158 *self.leader_state.write() = None;
1159 *self.candidate_state.write() = None;
1160 debug!(
1161 node_id = self.node_id(),
1162 from_term = from_term,
1163 to_term = persistent.current_term,
1164 leader_id = req.leader_id,
1165 "Stepped down to follower (higher term in InstallSnapshot)"
1166 );
1167 }
1168
1169 if req.term < persistent.current_term {
1171 warn!(
1172 node_id = self.node_id(),
1173 leader = req.leader_id,
1174 current_term = persistent.current_term,
1175 request_term = req.term,
1176 "Rejecting InstallSnapshot: stale term"
1177 );
1178 return Ok(InstallSnapshotResponse::new(persistent.current_term));
1179 }
1180
1181 *self.last_heartbeat.write() = Instant::now();
1183 volatile.become_follower(Some(req.leader_id));
1184 *self.candidate_state.write() = None;
1185
1186 let current_term = persistent.current_term;
1187 drop(persistent);
1188 drop(volatile);
1189
1190 let mut receiver_guard = self.snapshot_receiver.write();
1192
1193 if req.offset == 0 {
1195 *receiver_guard = Some(SnapshotReceiver::new(
1196 req.last_included_index,
1197 req.last_included_term,
1198 ));
1199 }
1200
1201 let receiver = match receiver_guard.as_mut() {
1202 Some(r) => r,
1203 None => {
1204 warn!(
1206 node_id = self.node_id(),
1207 offset = req.offset,
1208 "Received non-initial snapshot chunk without active receiver"
1209 );
1210 return Ok(InstallSnapshotResponse::new(current_term));
1211 }
1212 };
1213
1214 let completed = receiver.receive_chunk(&req)?;
1216
1217 if let Some(snapshot) = completed {
1218 *receiver_guard = None;
1220 drop(receiver_guard);
1221
1222 let mut snap_guard = self.snapshot_manager.write();
1224 if let Some(manager) = snap_guard.as_mut() {
1225 manager.install_snapshot(snapshot)?;
1226 }
1227
1228 let mut log = self.log.write();
1230 log.install_snapshot(req.last_included_index, req.last_included_term);
1231
1232 info!(
1233 node_id = self.node_id(),
1234 last_included_index = req.last_included_index,
1235 last_included_term = req.last_included_term,
1236 "Installed snapshot from leader"
1237 );
1238 }
1239
1240 Ok(InstallSnapshotResponse::new(current_term))
1241 }
1242
1243 pub fn prepare_install_snapshot(
1248 &self,
1249 target_peer: NodeId,
1250 ) -> RaftResult<Option<InstallSnapshotRequest>> {
1251 let volatile = self.volatile.read();
1252 if !volatile.is_leader() {
1253 return Ok(None);
1254 }
1255 drop(volatile);
1256
1257 let snap_guard = self.snapshot_manager.read();
1258 let manager = match snap_guard.as_ref() {
1259 Some(m) => m,
1260 None => return Ok(None),
1261 };
1262
1263 let snapshot = match manager.load_latest()? {
1264 Some(s) => s,
1265 None => return Ok(None),
1266 };
1267
1268 let leader_state_guard = self.leader_state.read();
1270 if let Some(leader_state) = leader_state_guard.as_ref() {
1271 let next_index = leader_state.get_next_index(target_peer);
1272 let log = self.log.read();
1273 let (snap_idx, _) = log.get_snapshot_point();
1274
1275 if next_index > snap_idx {
1276 return Ok(None);
1278 }
1279 }
1280
1281 let term = self.current_term();
1282 let req = InstallSnapshotRequest::new_complete(
1283 term,
1284 self.node_id(),
1285 snapshot.metadata.last_included_index,
1286 snapshot.metadata.last_included_term,
1287 snapshot.data,
1288 );
1289
1290 Ok(Some(req))
1291 }
1292
1293 pub fn follower_needs_snapshot(&self, peer: NodeId) -> bool {
1297 let leader_state_guard = self.leader_state.read();
1298 let leader_state = match leader_state_guard.as_ref() {
1299 Some(s) => s,
1300 None => return false,
1301 };
1302
1303 let next_index = leader_state.get_next_index(peer);
1304 let log = self.log.read();
1305 let (snap_idx, _) = log.get_snapshot_point();
1306
1307 snap_idx > 0 && next_index <= snap_idx
1308 }
1309
1310 pub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()> {
1318 self.propose_membership_change(MembershipChange::AddNode { node_id, address })
1319 }
1320
1321 pub fn remove_node(&self, node_id: NodeId) -> RaftResult<()> {
1326 self.propose_membership_change(MembershipChange::RemoveNode { node_id })
1327 }
1328
1329 pub fn cluster_members(&self) -> Vec<(NodeId, String)> {
1331 self.config_state.read().all_members()
1332 }
1333
1334 pub fn is_in_joint_consensus(&self) -> bool {
1336 self.config_state.read().is_joint()
1337 }
1338
1339 pub fn membership_version(&self) -> u64 {
1341 self.config_state.read().version()
1342 }
1343
1344 pub fn propose_membership_change(&self, change: MembershipChange) -> RaftResult<()> {
1352 let volatile = self.volatile.read();
1354 if !volatile.is_leader() {
1355 return Err(RaftError::NotLeader {
1356 leader_id: volatile.leader_id,
1357 });
1358 }
1359 drop(volatile);
1360
1361 let mut cs = self.config_state.write();
1362
1363 if cs.is_joint() {
1365 return Err(RaftError::MembershipChangeInProgress);
1366 }
1367
1368 let current = match &*cs {
1369 ConfigState::Stable(c) => c.clone(),
1370 ConfigState::Joint { .. } => return Err(RaftError::MembershipChangeInProgress),
1371 };
1372
1373 let new_config = match &change {
1375 MembershipChange::AddNode { node_id, address } => {
1376 if current.contains(*node_id) {
1377 return Err(RaftError::NodeAlreadyMember { node_id: *node_id });
1378 }
1379 current.with_added_member(*node_id, address.clone())
1380 }
1381 MembershipChange::RemoveNode { node_id } => {
1382 if !current.contains(*node_id) {
1383 return Err(RaftError::NodeNotMember { node_id: *node_id });
1384 }
1385 current.without_member(*node_id)
1386 }
1387 };
1388
1389 *cs = ConfigState::Joint {
1391 old: current.clone(),
1392 new: new_config.clone(),
1393 };
1394
1395 info!(
1396 node_id = self.node_id(),
1397 change = ?change,
1398 old_version = current.version(),
1399 new_version = new_config.version(),
1400 "Entered joint consensus"
1401 );
1402
1403 self.update_leader_state_for_config(&cs);
1406
1407 let term = self.current_term();
1409 let mut log = self.log.write();
1410 let _index = log.append(term, Command::from_str("__membership_joint__"));
1411
1412 Ok(())
1413 }
1414
1415 pub fn commit_membership_change(&self) -> RaftResult<()> {
1420 let mut cs = self.config_state.write();
1421 let new_config = match &*cs {
1422 ConfigState::Joint { new, .. } => new.clone(),
1423 ConfigState::Stable(_) => {
1424 return Ok(());
1426 }
1427 };
1428
1429 *cs = ConfigState::Stable(new_config.clone());
1430
1431 info!(
1432 node_id = self.node_id(),
1433 version = new_config.version(),
1434 members = ?new_config.member_ids(),
1435 "Committed membership change, now stable"
1436 );
1437
1438 self.update_leader_state_for_config(&cs);
1440
1441 let term = self.current_term();
1443 let mut log = self.log.write();
1444 let _index = log.append(term, Command::from_str("__membership_stable__"));
1445
1446 if !new_config.contains(self.node_id()) {
1448 drop(cs);
1449 drop(log);
1450 self.step_down();
1451 }
1452
1453 Ok(())
1454 }
1455
1456 pub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool {
1459 self.config_state.read().has_quorum(responding_nodes)
1460 }
1461
1462 pub fn is_stepping_down(&self) -> bool {
1464 *self.stepping_down.read()
1465 }
1466
1467 fn step_down(&self) {
1469 let mut volatile = self.volatile.write();
1470 volatile.become_follower(None);
1471 *self.leader_state.write() = None;
1472 *self.candidate_state.write() = None;
1473 *self.stepping_down.write() = true;
1474
1475 info!(
1476 node_id = self.node_id(),
1477 "Stepping down -- removed from cluster"
1478 );
1479 }
1480
1481 fn update_leader_state_for_config(&self, cs: &ConfigState) {
1484 let mut ls_guard = self.leader_state.write();
1485 let ls = match ls_guard.as_mut() {
1486 Some(s) => s,
1487 None => return,
1488 };
1489
1490 let all_ids = cs.all_member_ids();
1491 let log = self.log.read();
1492 let last_log_index = log.last_index();
1493
1494 for &id in &all_ids {
1496 if id == self.node_id() {
1497 continue;
1498 }
1499 ls.next_index.entry(id).or_insert(last_log_index + 1);
1500 ls.match_index.entry(id).or_insert(0);
1501 }
1502
1503 ls.next_index
1505 .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1506 ls.match_index
1507 .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1508 }
1509
1510 pub fn election_timeout_elapsed(&self) -> bool {
1512 let last_heartbeat = *self.last_heartbeat.read();
1513 let timeout = self.config.random_election_timeout();
1514 last_heartbeat.elapsed() >= timeout
1515 }
1516
1517 pub fn reset_election_timer(&self) {
1519 *self.last_heartbeat.write() = Instant::now();
1520 }
1521
1522 pub fn get_leader_hint(&self) -> Option<NodeId> {
1524 self.volatile.read().leader_id
1525 }
1526
1527 pub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest> {
1531 let state = self.volatile.read().node_state;
1532 if state != NodeState::Follower {
1533 return Vec::new();
1534 }
1535 self.start_election()
1536 }
1537}
1538
1539fn replay_wal_into_log(wal_dir: &std::path::Path, log: &mut RaftLog) -> RaftResult<()> {
1555 let reader = WalReader::new(wal_dir);
1556 let (wal_entries, diag) = reader.recover_with_policy(CorruptionPolicy::TruncateToLastGood)?;
1557
1558 if diag.corrupt_entries > 0 || diag.truncated_segments > 0 {
1559 warn!(
1560 corrupt_entries = diag.corrupt_entries,
1561 truncated_segments = diag.truncated_segments,
1562 valid_entries = diag.valid_entries,
1563 "WAL replay: corruption/truncation detected"
1564 );
1565 }
1566
1567 if wal_entries.is_empty() {
1568 info!(wal_dir = %wal_dir.display(), "WAL replay: no entries to recover");
1569 return Ok(());
1570 }
1571
1572 let current_last = log.last_index();
1573 let new_entries: Vec<LogEntry> = wal_entries
1574 .into_iter()
1575 .filter(|e| e.index > current_last)
1576 .collect();
1577
1578 if new_entries.is_empty() {
1579 info!(
1580 wal_dir = %wal_dir.display(),
1581 current_last,
1582 "WAL replay: all WAL entries already present in log"
1583 );
1584 return Ok(());
1585 }
1586
1587 let replayed_count = new_entries.len();
1588 let first_new = new_entries[0].index;
1589 let last_new = new_entries[new_entries.len() - 1].index;
1590
1591 log.append_entries(new_entries)?;
1592
1593 info!(
1594 wal_dir = %wal_dir.display(),
1595 replayed_count,
1596 first_new,
1597 last_new,
1598 new_last_index = log.last_index(),
1599 "WAL replay complete"
1600 );
1601
1602 Ok(())
1603}
1604
1605#[cfg(test)]
1606#[path = "node_tests.rs"]
1607mod tests;