Skip to main content

amaters_cluster/
node.rs

1//! Main Raft node implementation
2
3use crate::error::{RaftError, RaftResult};
4use crate::log::{Command, LogEntry, RaftLog};
5use crate::persistence::{FilePersistence, RaftPersistence};
6use crate::rpc::{
7    AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
8};
9use crate::snapshot::{
10    InstallSnapshotRequest, InstallSnapshotResponse, Snapshot, SnapshotConfig, SnapshotManager,
11    SnapshotPolicy, SnapshotReceiver,
12};
13use crate::state::FencingTokenState;
14use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
15use crate::types::{
16    ClusterConfig, ConfigState, FencingToken, LogIndex, MembershipChange, NodeId, NodeState,
17    RaftConfig, Term,
18};
19use crate::wal::{CorruptionPolicy, WalReader};
20use parking_lot::RwLock;
21use std::collections::HashSet;
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::{Duration, Instant};
25use tracing::{debug, info, warn};
26
27/// A Raft consensus node
28pub struct RaftNode {
29    /// Node configuration
30    config: Arc<RaftConfig>,
31    /// Persistent state
32    persistent: Arc<RwLock<PersistentState>>,
33    /// Volatile state
34    volatile: Arc<RwLock<VolatileState>>,
35    /// Raft log
36    log: Arc<RwLock<RaftLog>>,
37    /// Leader-specific state
38    leader_state: Arc<RwLock<Option<LeaderState>>>,
39    /// Candidate-specific state
40    candidate_state: Arc<RwLock<Option<CandidateState>>>,
41    /// Last time we received a message from the leader
42    last_heartbeat: Arc<RwLock<Instant>>,
43    /// Snapshot manager for creating and loading snapshots
44    snapshot_manager: Arc<RwLock<Option<SnapshotManager>>>,
45    /// Receiver for chunked snapshot transfers from the leader
46    snapshot_receiver: Arc<RwLock<Option<SnapshotReceiver>>>,
47    /// Optional persistent storage backend
48    persistence: Option<Arc<dyn RaftPersistence>>,
49    /// Dynamic cluster membership state (joint consensus)
50    config_state: Arc<RwLock<ConfigState>>,
51    /// Whether this node has been removed and should step down
52    stepping_down: Arc<RwLock<bool>>,
53    /// Packed fencing token state (atomic, lock-free reads)
54    fencing_token_state: Arc<FencingTokenState>,
55    /// True while WAL replay is in progress; RPCs are rejected during this window
56    is_recovering: Arc<AtomicBool>,
57}
58
59impl RaftNode {
60    /// Create a new Raft node
61    pub fn new(config: RaftConfig) -> RaftResult<Self> {
62        // Validate configuration
63        config
64            .validate()
65            .map_err(|msg| RaftError::ConfigError { message: msg })?;
66
67        // Initialize snapshot manager if snapshot directory is configured
68        let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
69            let snap_config =
70                SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
71            Some(SnapshotManager::new(snap_config)?)
72        } else {
73            None
74        };
75
76        // Initialize persistence backend if persistence_dir is configured
77        let persistence: Option<Arc<dyn RaftPersistence>> =
78            if let Some(ref dir) = config.persistence_dir {
79                Some(Arc::new(FilePersistence::new(dir, config.sync_on_write)?))
80            } else {
81                None
82            };
83
84        // If persistence is available, recover state from disk
85        let (persistent_state, mut raft_log) = if let Some(ref p) = persistence {
86            let (term, voted_for) = p.load_state()?;
87            let mut ps = PersistentState::new();
88            ps.current_term = term;
89            ps.voted_for = voted_for;
90
91            let entries = p.load_log()?;
92            let mut log = RaftLog::new();
93            if !entries.is_empty() {
94                log.append_entries(entries)?;
95            }
96
97            // Restore applied_index from persistence
98            let applied_idx = p.load_applied_index()?;
99            if applied_idx > 0 && applied_idx <= log.last_index() {
100                // Restore commit_index to applied_index (safe lower bound on recovery)
101                if let Err(e) = log.set_commit_index(applied_idx) {
102                    warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
103                } else if let Err(e) = log.set_applied_index(applied_idx) {
104                    warn!(applied_idx, error = ?e, "Failed to restore applied index");
105                }
106            }
107
108            info!(
109                node_id = config.node_id,
110                term = term,
111                voted_for = ?voted_for,
112                last_log_index = log.last_index(),
113                "Recovered state from persistence"
114            );
115
116            (ps, log)
117        } else {
118            (PersistentState::new(), RaftLog::new())
119        };
120
121        // Replay WAL entries if wal_dir is configured.
122        // The `is_recovering` flag is set to true for the duration of replay
123        // so that any concurrent RPC handlers can reject requests gracefully.
124        let is_recovering = Arc::new(AtomicBool::new(false));
125        if let Some(ref wal_dir) = config.wal_dir {
126            is_recovering.store(true, Ordering::Release);
127            let result = replay_wal_into_log(wal_dir, &mut raft_log);
128            is_recovering.store(false, Ordering::Release);
129            result?;
130        }
131
132        // Build initial stable config from peers (using empty addresses for now)
133        let initial_members: Vec<(NodeId, String)> =
134            config.peers.iter().map(|&id| (id, String::new())).collect();
135        let config_state = ConfigState::new_stable(initial_members);
136
137        Ok(Self {
138            config: Arc::new(config),
139            persistent: Arc::new(RwLock::new(persistent_state)),
140            volatile: Arc::new(RwLock::new(VolatileState::new())),
141            log: Arc::new(RwLock::new(raft_log)),
142            leader_state: Arc::new(RwLock::new(None)),
143            candidate_state: Arc::new(RwLock::new(None)),
144            last_heartbeat: Arc::new(RwLock::new(Instant::now())),
145            snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
146            snapshot_receiver: Arc::new(RwLock::new(None)),
147            persistence,
148            config_state: Arc::new(RwLock::new(config_state)),
149            stepping_down: Arc::new(RwLock::new(false)),
150            fencing_token_state: Arc::new(FencingTokenState::new()),
151            is_recovering,
152        })
153    }
154
155    /// Create a new Raft node with an explicit persistence backend.
156    ///
157    /// Recovers state from the given persistence backend and uses it for all
158    /// subsequent state and log mutations.
159    pub fn with_persistence(
160        config: RaftConfig,
161        persistence: Arc<dyn RaftPersistence>,
162    ) -> RaftResult<Self> {
163        config
164            .validate()
165            .map_err(|msg| RaftError::ConfigError { message: msg })?;
166
167        let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
168            let snap_config =
169                SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
170            Some(SnapshotManager::new(snap_config)?)
171        } else {
172            None
173        };
174
175        let (term, voted_for) = persistence.load_state()?;
176        let mut ps = PersistentState::new();
177        ps.current_term = term;
178        ps.voted_for = voted_for;
179
180        let entries = persistence.load_log()?;
181        let mut raft_log = RaftLog::new();
182        if !entries.is_empty() {
183            raft_log.append_entries(entries)?;
184        }
185
186        // Restore applied_index from persistence
187        let applied_idx = persistence.load_applied_index()?;
188        if applied_idx > 0 && applied_idx <= raft_log.last_index() {
189            if let Err(e) = raft_log.set_commit_index(applied_idx) {
190                warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
191            } else if let Err(e) = raft_log.set_applied_index(applied_idx) {
192                warn!(applied_idx, error = ?e, "Failed to restore applied index");
193            }
194        }
195
196        info!(
197            node_id = config.node_id,
198            term = term,
199            voted_for = ?voted_for,
200            last_log_index = raft_log.last_index(),
201            "Recovered state via explicit persistence"
202        );
203
204        // Replay WAL entries if wal_dir is configured.
205        let is_recovering = Arc::new(AtomicBool::new(false));
206        if let Some(ref wal_dir) = config.wal_dir {
207            is_recovering.store(true, Ordering::Release);
208            let result = replay_wal_into_log(wal_dir, &mut raft_log);
209            is_recovering.store(false, Ordering::Release);
210            result?;
211        }
212
213        let initial_members: Vec<(NodeId, String)> =
214            config.peers.iter().map(|&id| (id, String::new())).collect();
215        let config_state = ConfigState::new_stable(initial_members);
216
217        Ok(Self {
218            config: Arc::new(config),
219            persistent: Arc::new(RwLock::new(ps)),
220            volatile: Arc::new(RwLock::new(VolatileState::new())),
221            log: Arc::new(RwLock::new(raft_log)),
222            leader_state: Arc::new(RwLock::new(None)),
223            candidate_state: Arc::new(RwLock::new(None)),
224            last_heartbeat: Arc::new(RwLock::new(Instant::now())),
225            snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
226            snapshot_receiver: Arc::new(RwLock::new(None)),
227            persistence: Some(persistence),
228            config_state: Arc::new(RwLock::new(config_state)),
229            stepping_down: Arc::new(RwLock::new(false)),
230            fencing_token_state: Arc::new(FencingTokenState::new()),
231            is_recovering,
232        })
233    }
234
235    /// Persist current term and voted_for to the storage backend (no-op if
236    /// persistence is not configured).
237    fn persist_state(&self, term: Term, voted_for: Option<NodeId>) {
238        if let Some(ref p) = self.persistence {
239            if let Err(e) = p.save_state(term, voted_for) {
240                warn!(node_id = self.node_id(), error = ?e, "Failed to persist state");
241            }
242        }
243    }
244
245    /// Persist log entries to the storage backend.
246    fn persist_log_entries(&self, entries: &[LogEntry]) {
247        if let Some(ref p) = self.persistence {
248            if let Err(e) = p.append_entries(entries) {
249                warn!(node_id = self.node_id(), error = ?e, "Failed to persist log entries");
250            }
251        }
252    }
253
254    /// Persist a log truncation to the storage backend.
255    fn persist_log_truncation(&self, from_index: LogIndex) {
256        if let Some(ref p) = self.persistence {
257            if let Err(e) = p.truncate_log_from(from_index) {
258                warn!(node_id = self.node_id(), error = ?e, "Failed to persist log truncation");
259            }
260        }
261    }
262
263    /// Get the current node ID
264    pub fn node_id(&self) -> NodeId {
265        self.config.node_id
266    }
267
268    /// Get the current term
269    pub fn current_term(&self) -> Term {
270        self.persistent.read().current_term
271    }
272
273    /// Get the current state
274    pub fn state(&self) -> NodeState {
275        self.volatile.read().node_state
276    }
277
278    /// Get the current leader ID
279    pub fn leader_id(&self) -> Option<NodeId> {
280        self.volatile.read().leader_id
281    }
282
283    /// Check if this node is the leader
284    pub fn is_leader(&self) -> bool {
285        self.volatile.read().is_leader()
286    }
287
288    /// Get the commit index
289    pub fn commit_index(&self) -> LogIndex {
290        self.log.read().commit_index()
291    }
292
293    /// Get the last log index
294    pub fn last_log_index(&self) -> LogIndex {
295        self.log.read().last_index()
296    }
297
298    /// Append a command to the log (leader only)
299    pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
300        let volatile = self.volatile.read();
301        if !volatile.is_leader() {
302            return Err(RaftError::NotLeader {
303                leader_id: volatile.leader_id,
304            });
305        }
306        drop(volatile);
307
308        let term = self.current_term();
309        let mut log = self.log.write();
310        let index = log.append(term, command.clone());
311
312        // Persist the new entry
313        let entry = LogEntry::new(term, index, command);
314        self.persist_log_entries(&[entry]);
315
316        info!(
317            node_id = self.node_id(),
318            index = index,
319            term = term,
320            "Proposed new entry"
321        );
322
323        Ok(index)
324    }
325
326    /// Return `true` if the node is currently replaying its WAL on startup.
327    pub fn is_recovering(&self) -> bool {
328        self.is_recovering.load(Ordering::Acquire)
329    }
330
331    /// Handle a RequestVote RPC
332    pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
333        // Reject all RPCs during WAL replay to maintain safety.
334        if self.is_recovering.load(Ordering::Acquire) {
335            warn!(
336                node_id = self.node_id(),
337                candidate = req.candidate_id,
338                event = "rpc_rejected_recovering",
339                "Rejecting RequestVote: node is recovering from WAL"
340            );
341            return RequestVoteResponse::rejected(self.current_term());
342        }
343
344        let mut persistent = self.persistent.write();
345        let mut volatile = self.volatile.write();
346
347        debug!(
348            node_id = self.node_id(),
349            candidate = req.candidate_id,
350            term = req.term,
351            "Received RequestVote"
352        );
353
354        // Update term if necessary
355        if req.term > persistent.current_term {
356            let from_term = persistent.current_term;
357            persistent.update_term(req.term);
358            self.persist_state(persistent.current_term, persistent.voted_for);
359            volatile.become_follower(None);
360            *self.leader_state.write() = None;
361            *self.candidate_state.write() = None;
362            debug!(
363                node_id = self.node_id(),
364                from_term = from_term,
365                to_term = persistent.current_term,
366                "Stepped down to follower (higher term in RequestVote)"
367            );
368        }
369
370        // Reject if term is stale
371        if req.term < persistent.current_term {
372            warn!(
373                node_id = self.node_id(),
374                candidate = req.candidate_id,
375                current_term = persistent.current_term,
376                request_term = req.term,
377                "Rejecting vote: stale term"
378            );
379            return RequestVoteResponse::rejected(persistent.current_term);
380        }
381
382        // Check if we've already voted
383        if let Some(voted_for) = persistent.voted_for {
384            if voted_for != req.candidate_id {
385                warn!(
386                    node_id = self.node_id(),
387                    candidate = req.candidate_id,
388                    voted_for = voted_for,
389                    "Rejecting vote: already voted"
390                );
391                return RequestVoteResponse::rejected(persistent.current_term);
392            }
393        }
394
395        // Check if candidate's log is at least as up-to-date as ours
396        let log = self.log.read();
397        let our_last_index = log.last_index();
398        let our_last_term = log.last_term();
399
400        let log_ok = req.last_log_term > our_last_term
401            || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
402
403        if !log_ok {
404            warn!(
405                node_id = self.node_id(),
406                candidate = req.candidate_id,
407                our_last_index = our_last_index,
408                our_last_term = our_last_term,
409                candidate_last_index = req.last_log_index,
410                candidate_last_term = req.last_log_term,
411                "Rejecting vote: candidate log not up-to-date"
412            );
413            return RequestVoteResponse::rejected(persistent.current_term);
414        }
415
416        // Grant vote
417        persistent.grant_vote(req.candidate_id);
418        self.persist_state(persistent.current_term, persistent.voted_for);
419        *self.last_heartbeat.write() = Instant::now();
420
421        info!(
422            node_id = self.node_id(),
423            candidate = req.candidate_id,
424            term = req.term,
425            "Granted vote"
426        );
427
428        RequestVoteResponse::granted(persistent.current_term)
429    }
430
431    /// Handle an AppendEntries RPC
432    pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
433        // Reject all RPCs during WAL replay to maintain safety.
434        if self.is_recovering.load(Ordering::Acquire) {
435            warn!(
436                node_id = self.node_id(),
437                leader = req.leader_id,
438                event = "rpc_rejected_recovering",
439                "Rejecting AppendEntries: node is recovering from WAL"
440            );
441            return AppendEntriesResponse::rejected(self.current_term());
442        }
443
444        let mut persistent = self.persistent.write();
445        let mut volatile = self.volatile.write();
446
447        debug!(
448            node_id = self.node_id(),
449            leader = req.leader_id,
450            term = req.term,
451            entries = req.entries.len(),
452            "Received AppendEntries"
453        );
454
455        // Update term if necessary
456        if req.term > persistent.current_term {
457            let from_term = persistent.current_term;
458            persistent.update_term(req.term);
459            self.persist_state(persistent.current_term, persistent.voted_for);
460            volatile.become_follower(Some(req.leader_id));
461            *self.leader_state.write() = None;
462            *self.candidate_state.write() = None;
463            debug!(
464                node_id = self.node_id(),
465                from_term = from_term,
466                to_term = persistent.current_term,
467                leader_id = req.leader_id,
468                "Stepped down to follower (higher term in AppendEntries)"
469            );
470        }
471
472        // Reject if term is stale
473        if req.term < persistent.current_term {
474            warn!(
475                node_id = self.node_id(),
476                leader = req.leader_id,
477                current_term = persistent.current_term,
478                request_term = req.term,
479                "Rejecting AppendEntries: stale term"
480            );
481            return AppendEntriesResponse::rejected(persistent.current_term);
482        }
483
484        // Update heartbeat and leader
485        *self.last_heartbeat.write() = Instant::now();
486        volatile.become_follower(Some(req.leader_id));
487        *self.candidate_state.write() = None;
488
489        drop(persistent);
490        drop(volatile);
491
492        // Handle the entries
493        let mut log = self.log.write();
494        let our_last_index = log.last_index();
495
496        // Check if we have the previous log entry
497        if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
498            // Find conflict index and term
499            let conflict_index = req.prev_log_index.min(our_last_index);
500            let conflict_term = log.get_term(conflict_index).unwrap_or(0);
501
502            warn!(
503                node_id = self.node_id(),
504                prev_log_index = req.prev_log_index,
505                prev_log_term = req.prev_log_term,
506                conflict_index = conflict_index,
507                conflict_term = conflict_term,
508                "Rejecting AppendEntries: log inconsistency"
509            );
510
511            return AppendEntriesResponse::failure(
512                self.current_term(),
513                our_last_index,
514                conflict_index,
515                conflict_term,
516            );
517        }
518
519        // Append entries if any
520        if !req.entries.is_empty() {
521            // Delete conflicting entries
522            let first_new_index = req.entries[0].index;
523            if first_new_index <= our_last_index {
524                if let Err(e) = log.truncate_from(first_new_index) {
525                    warn!(
526                        node_id = self.node_id(),
527                        error = ?e,
528                        "Failed to truncate log"
529                    );
530                    return AppendEntriesResponse::rejected(self.current_term());
531                }
532                self.persist_log_truncation(first_new_index);
533            }
534
535            // Persist before in-memory append
536            self.persist_log_entries(&req.entries);
537
538            // Append new entries
539            if let Err(e) = log.append_entries(req.entries) {
540                warn!(
541                    node_id = self.node_id(),
542                    error = ?e,
543                    "Failed to append entries"
544                );
545                return AppendEntriesResponse::rejected(self.current_term());
546            }
547        }
548
549        // Update commit index
550        if req.leader_commit > log.commit_index() {
551            let old_commit = log.commit_index();
552            let new_commit = req.leader_commit.min(log.last_index());
553            if let Err(e) = log.set_commit_index(new_commit) {
554                warn!(
555                    node_id = self.node_id(),
556                    error = ?e,
557                    "Failed to update commit index"
558                );
559            } else {
560                debug!(
561                    node_id = self.node_id(),
562                    old_commit_index = old_commit,
563                    new_commit_index = new_commit,
564                    "Updated commit index"
565                );
566            }
567        }
568
569        AppendEntriesResponse::success(self.current_term(), log.last_index())
570    }
571
572    /// Start an election (transition to candidate)
573    pub fn start_election(&self) -> Vec<RequestVoteRequest> {
574        let mut persistent = self.persistent.write();
575        let mut volatile = self.volatile.write();
576
577        // Increment term and vote for self
578        persistent.current_term += 1;
579        persistent.grant_vote(self.node_id());
580
581        // Persist the new term and vote before responding
582        self.persist_state(persistent.current_term, persistent.voted_for);
583
584        // Transition to candidate
585        volatile.become_candidate();
586
587        // Initialize candidate state
588        *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
589
590        let term = persistent.current_term;
591        let log = self.log.read();
592        let last_log_index = log.last_index();
593        let last_log_term = log.last_term();
594
595        let _span =
596            tracing::info_span!("raft_election", node_id = self.node_id(), term = term).entered();
597
598        info!(
599            node_id = self.node_id(),
600            candidate_term = term,
601            log_length = last_log_index,
602            "Started election"
603        );
604
605        // Send RequestVote to all peers
606        self.config
607            .peers
608            .iter()
609            .filter(|&&peer| peer != self.node_id())
610            .map(|&peer| {
611                RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
612            })
613            .collect()
614    }
615
616    /// Handle a vote response during election
617    pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
618        let should_become_leader = {
619            let mut persistent = self.persistent.write();
620            let mut volatile = self.volatile.write();
621
622            // Check if we're still a candidate
623            if !volatile.is_candidate() {
624                return false;
625            }
626
627            // Update term if necessary
628            if resp.term > persistent.current_term {
629                let from_term = persistent.current_term;
630                persistent.update_term(resp.term);
631                self.persist_state(persistent.current_term, persistent.voted_for);
632                volatile.become_follower(None);
633                *self.candidate_state.write() = None;
634                debug!(
635                    node_id = self.node_id(),
636                    from_term = from_term,
637                    to_term = persistent.current_term,
638                    "Stepped down to follower (higher term in vote response)"
639                );
640                return false;
641            }
642
643            // Ignore stale responses
644            if resp.term < persistent.current_term {
645                return false;
646            }
647
648            // Record vote if granted
649            if resp.vote_granted {
650                let mut candidate_state_guard = self.candidate_state.write();
651                if let Some(candidate_state) = candidate_state_guard.as_mut() {
652                    candidate_state.record_vote(from);
653
654                    info!(
655                        node_id = self.node_id(),
656                        from = from,
657                        votes = candidate_state.vote_count(),
658                        quorum = self.config.quorum_size(),
659                        "Received vote"
660                    );
661
662                    // Check if we have a quorum
663                    candidate_state.has_quorum(self.config.quorum_size())
664                } else {
665                    false
666                }
667            } else {
668                false
669            }
670        };
671
672        // Become leader outside of locks to prevent deadlock
673        if should_become_leader {
674            let votes = self
675                .candidate_state
676                .read()
677                .as_ref()
678                .map(|cs| cs.vote_count())
679                .unwrap_or(0);
680            info!(
681                node_id = self.node_id(),
682                term = self.current_term(),
683                votes_received = votes,
684                "Won election with quorum"
685            );
686            self.become_leader();
687            return true;
688        }
689
690        false
691    }
692
693    /// Transition to leader
694    fn become_leader(&self) {
695        let mut volatile = self.volatile.write();
696        volatile.become_leader();
697
698        let log = self.log.read();
699        let last_log_index = log.last_index();
700
701        // Initialize leader state
702        *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
703        *self.candidate_state.write() = None;
704
705        let persistent = self.persistent.read();
706        let term = persistent.current_term;
707
708        // Bump the packed fencing token to the new leader term (resets seq to 0).
709        self.fencing_token_state.bump_term_token(term as u32);
710
711        info!(
712            node_id = self.node_id(),
713            term,
714            voted_for = ?persistent.voted_for,
715            peer_count = self.config.peers.len(),
716            "Became leader"
717        );
718    }
719
720    /// Issue a new fencing token for the current leader term.
721    ///
722    /// Returns `None` if the node is not the current leader.
723    pub fn issue_fencing_token(&self) -> Option<FencingToken> {
724        if !self.volatile.read().is_leader() {
725            return None;
726        }
727        Some(self.fencing_token_state.issue_token())
728    }
729
730    /// Validate that `token` is not stale relative to the current Raft term.
731    ///
732    /// Returns `Ok(())` if the token's term matches the current Raft term.
733    /// Returns `Err(RaftError::StaleTerm)` if the token predates the current term.
734    pub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()> {
735        let current_term = self.current_term();
736        if token.term() as u64 == current_term {
737            Ok(())
738        } else {
739            Err(RaftError::StaleTerm {
740                current: current_term,
741                received: token.term() as u64,
742            })
743        }
744    }
745
746    /// Create heartbeat messages for all peers
747    pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
748        let volatile = self.volatile.read();
749        if !volatile.is_leader() {
750            return Vec::new();
751        }
752        drop(volatile);
753
754        let term = self.current_term();
755        let log = self.log.read();
756        let leader_commit = log.commit_index();
757
758        self.config
759            .peers
760            .iter()
761            .filter(|&&peer| peer != self.node_id())
762            .map(|&peer| {
763                let prev_log_index = log.last_index();
764                let prev_log_term = log.last_term();
765
766                let req = AppendEntriesRequest::heartbeat(
767                    term,
768                    self.node_id(),
769                    prev_log_index,
770                    prev_log_term,
771                    leader_commit,
772                );
773
774                (peer, req)
775            })
776            .collect()
777    }
778
779    /// Create replication messages for all peers
780    pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
781        let volatile = self.volatile.read();
782        if !volatile.is_leader() {
783            return Vec::new();
784        }
785        drop(volatile);
786
787        let leader_state_guard = self.leader_state.read();
788        let leader_state = match leader_state_guard.as_ref() {
789            Some(state) => state,
790            None => return Vec::new(),
791        };
792
793        let term = self.current_term();
794        let log = self.log.read();
795        let leader_commit = log.commit_index();
796
797        self.config
798            .peers
799            .iter()
800            .filter(|&&peer| peer != self.node_id())
801            .filter_map(|&peer| {
802                let next_index = leader_state.get_next_index(peer);
803
804                if next_index > log.last_index() {
805                    return None;
806                }
807
808                let prev_log_index = next_index.saturating_sub(1);
809                let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
810
811                let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
812
813                if entries.is_empty() {
814                    return None;
815                }
816
817                let req = AppendEntriesRequest::new(
818                    term,
819                    self.node_id(),
820                    prev_log_index,
821                    prev_log_term,
822                    entries,
823                    leader_commit,
824                );
825
826                Some((peer, req))
827            })
828            .collect()
829    }
830
831    /// Replicate log entries to all followers that need them.
832    ///
833    /// This is a convenience method that combines `create_replication_requests()`
834    /// with the information the caller needs to actually send the RPCs.
835    /// Returns a list of `(peer_id, request)` pairs. If a follower is fully
836    /// caught up (its `next_index > last_log_index`), it is omitted -- use
837    /// `create_heartbeats()` for idle keep-alive messages.
838    ///
839    /// Typical usage in a replication loop:
840    /// ```rust,ignore
841    /// let requests = leader.replicate_to_followers();
842    /// for (peer, req) in requests {
843    ///     let resp = rpc_send(peer, req);
844    ///     leader.handle_replication_response(peer, resp)?;
845    /// }
846    /// ```
847    pub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
848        self.create_replication_requests()
849    }
850
851    /// Create an AppendEntries request for a specific follower.
852    ///
853    /// Returns `None` if this node is not the leader, if the follower is
854    /// already fully caught up, or if leader state is unavailable.
855    pub fn create_replication_request_for(&self, peer: NodeId) -> Option<AppendEntriesRequest> {
856        let volatile = self.volatile.read();
857        if !volatile.is_leader() {
858            return None;
859        }
860        drop(volatile);
861
862        let leader_state_guard = self.leader_state.read();
863        let leader_state = leader_state_guard.as_ref()?;
864
865        let term = self.current_term();
866        let log = self.log.read();
867        let leader_commit = log.commit_index();
868
869        let next_index = leader_state.get_next_index(peer);
870
871        if next_index > log.last_index() {
872            // Peer is up-to-date; nothing to replicate
873            return None;
874        }
875
876        let prev_log_index = next_index.saturating_sub(1);
877        let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
878
879        let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
880
881        if entries.is_empty() {
882            return None;
883        }
884
885        Some(AppendEntriesRequest::new(
886            term,
887            self.node_id(),
888            prev_log_index,
889            prev_log_term,
890            entries,
891            leader_commit,
892        ))
893    }
894
895    /// Handle a replication response
896    pub fn handle_replication_response(
897        &self,
898        from: NodeId,
899        resp: AppendEntriesResponse,
900    ) -> RaftResult<()> {
901        let mut persistent = self.persistent.write();
902        let mut volatile = self.volatile.write();
903
904        // Check if we're still the leader
905        if !volatile.is_leader() {
906            return Ok(());
907        }
908
909        // Update term if necessary
910        if resp.term > persistent.current_term {
911            let from_term = persistent.current_term;
912            persistent.update_term(resp.term);
913            self.persist_state(persistent.current_term, persistent.voted_for);
914            volatile.become_follower(None);
915            *self.leader_state.write() = None;
916            info!(
917                node_id = self.node_id(),
918                from_term = from_term,
919                to_term = persistent.current_term,
920                "Stepped down: leader to follower (higher term in replication response)"
921            );
922            return Ok(());
923        }
924
925        drop(persistent);
926        drop(volatile);
927
928        let mut leader_state_guard = self.leader_state.write();
929        let leader_state = match leader_state_guard.as_mut() {
930            Some(state) => state,
931            None => return Ok(()),
932        };
933
934        if resp.success {
935            // Update match_index and next_index
936            leader_state.update_success(from, resp.last_log_index);
937
938            debug!(
939                node_id = self.node_id(),
940                peer = from,
941                match_index = resp.last_log_index,
942                "Replication successful"
943            );
944
945            // Try to advance commit index, using joint-consensus-aware
946            // calculation when a membership change is in progress.
947            let config_state = self.config_state.read().clone();
948            let new_commit = leader_state.calculate_commit_index_joint(
949                self.node_id(),
950                self.log.read().last_index(),
951                &config_state,
952            );
953
954            let mut log = self.log.write();
955            if new_commit > log.commit_index() {
956                // Only commit entries from the current term (Raft safety)
957                if let Some(term) = log.get_term(new_commit) {
958                    if term == self.current_term() {
959                        let old_commit = log.commit_index();
960                        log.set_commit_index(new_commit)?;
961                        info!(
962                            node_id = self.node_id(),
963                            old_commit_index = old_commit,
964                            new_commit_index = new_commit,
965                            "Advanced commit index"
966                        );
967                    }
968                }
969            }
970        } else {
971            // Replication failed -- use fast backup with conflict hints
972            // when available, otherwise simple decrement.
973            if resp.conflict_index.is_some() || resp.conflict_term.is_some() {
974                leader_state.update_failure_with_hint(
975                    from,
976                    resp.conflict_index,
977                    resp.conflict_term,
978                    resp.last_log_index,
979                );
980            } else {
981                leader_state.update_failure(from);
982            }
983
984            warn!(
985                node_id = self.node_id(),
986                peer = from,
987                next_index = leader_state.get_next_index(from),
988                conflict_index = ?resp.conflict_index,
989                conflict_term = ?resp.conflict_term,
990                "Replication failed, will retry with adjusted next_index"
991            );
992        }
993
994        Ok(())
995    }
996
997    /// Attempt to create a snapshot if the log has grown past the threshold
998    ///
999    /// Call this after advancing the commit index. If a snapshot is created,
1000    /// the log is compacted up to the snapshot point.
1001    ///
1002    /// `state_machine_data` is the serialized state of the application state machine
1003    /// at the current applied index.
1004    pub fn maybe_create_snapshot(&self, state_machine_data: Vec<u8>) -> RaftResult<bool> {
1005        let mut snap_guard = self.snapshot_manager.write();
1006        let manager = match snap_guard.as_mut() {
1007            Some(m) => m,
1008            None => return Ok(false),
1009        };
1010
1011        let log = self.log.read();
1012        let entries_since = log.entries_since_snapshot();
1013
1014        if !manager.should_snapshot(entries_since) {
1015            return Ok(false);
1016        }
1017
1018        let applied_index = log.applied_index();
1019        if applied_index == 0 {
1020            return Ok(false);
1021        }
1022
1023        let applied_term = match log.get_term(applied_index) {
1024            Some(t) => t,
1025            None => {
1026                // The applied entry might already be compacted; use snapshot term
1027                let (snap_idx, snap_term) = log.get_snapshot_point();
1028                if applied_index == snap_idx {
1029                    snap_term
1030                } else {
1031                    return Err(RaftError::LogInconsistency {
1032                        reason: format!(
1033                            "Cannot determine term for applied index {}",
1034                            applied_index
1035                        ),
1036                    });
1037                }
1038            }
1039        };
1040
1041        drop(log);
1042
1043        manager.create_snapshot(state_machine_data, applied_index, applied_term)?;
1044
1045        // Compact the log
1046        let mut log = self.log.write();
1047        log.compact_until(applied_index, applied_term)?;
1048
1049        info!(
1050            node_id = self.node_id(),
1051            snapshot_index = applied_index,
1052            snapshot_term = applied_term,
1053            "Created snapshot and compacted log"
1054        );
1055
1056        Ok(true)
1057    }
1058
1059    /// Automatically create a snapshot if the log has grown past the configured threshold.
1060    ///
1061    /// Unlike `maybe_create_snapshot`, this method uses a `SnapshotPolicy` to decide
1062    /// whether to snapshot and takes a closure that produces state machine data on demand
1063    /// (avoiding the cost of serialization when no snapshot is needed).
1064    ///
1065    /// Call this after applying committed entries.
1066    pub fn auto_snapshot_if_needed<F>(
1067        &self,
1068        policy: &SnapshotPolicy,
1069        state_machine_data_fn: F,
1070    ) -> RaftResult<bool>
1071    where
1072        F: FnOnce() -> RaftResult<Vec<u8>>,
1073    {
1074        let log = self.log.read();
1075        let entries_since = log.entries_since_snapshot();
1076        let applied_index = log.applied_index();
1077
1078        if !policy.should_snapshot(entries_since, applied_index) {
1079            return Ok(false);
1080        }
1081
1082        if applied_index == 0 {
1083            return Ok(false);
1084        }
1085
1086        let applied_term = match log.get_term(applied_index) {
1087            Some(t) => t,
1088            None => {
1089                let (snap_idx, snap_term) = log.get_snapshot_point();
1090                if applied_index == snap_idx {
1091                    snap_term
1092                } else {
1093                    return Err(RaftError::LogInconsistency {
1094                        reason: format!(
1095                            "Cannot determine term for applied index {}",
1096                            applied_index
1097                        ),
1098                    });
1099                }
1100            }
1101        };
1102
1103        drop(log);
1104
1105        let data = state_machine_data_fn()?;
1106
1107        let mut snap_guard = self.snapshot_manager.write();
1108        let manager = match snap_guard.as_mut() {
1109            Some(m) => m,
1110            None => return Ok(false),
1111        };
1112
1113        manager.create_snapshot(data, applied_index, applied_term)?;
1114        drop(snap_guard);
1115
1116        let mut log = self.log.write();
1117        log.compact_until(applied_index, applied_term)?;
1118
1119        info!(
1120            node_id = self.node_id(),
1121            snapshot_index = applied_index,
1122            snapshot_term = applied_term,
1123            entries_compacted = entries_since,
1124            "Auto-snapshot triggered and log compacted"
1125        );
1126
1127        Ok(true)
1128    }
1129
1130    /// Handle an InstallSnapshot RPC from the leader
1131    ///
1132    /// Used when a follower is too far behind and the leader sends a snapshot
1133    /// instead of individual log entries.
1134    pub fn handle_install_snapshot(
1135        &self,
1136        req: InstallSnapshotRequest,
1137    ) -> RaftResult<InstallSnapshotResponse> {
1138        let mut persistent = self.persistent.write();
1139        let mut volatile = self.volatile.write();
1140
1141        debug!(
1142            node_id = self.node_id(),
1143            leader = req.leader_id,
1144            term = req.term,
1145            last_included_index = req.last_included_index,
1146            last_included_term = req.last_included_term,
1147            offset = req.offset,
1148            done = req.done,
1149            "Received InstallSnapshot"
1150        );
1151
1152        // Update term if necessary
1153        if req.term > persistent.current_term {
1154            let from_term = persistent.current_term;
1155            persistent.update_term(req.term);
1156            self.persist_state(persistent.current_term, persistent.voted_for);
1157            volatile.become_follower(Some(req.leader_id));
1158            *self.leader_state.write() = None;
1159            *self.candidate_state.write() = None;
1160            debug!(
1161                node_id = self.node_id(),
1162                from_term = from_term,
1163                to_term = persistent.current_term,
1164                leader_id = req.leader_id,
1165                "Stepped down to follower (higher term in InstallSnapshot)"
1166            );
1167        }
1168
1169        // Reject if term is stale
1170        if req.term < persistent.current_term {
1171            warn!(
1172                node_id = self.node_id(),
1173                leader = req.leader_id,
1174                current_term = persistent.current_term,
1175                request_term = req.term,
1176                "Rejecting InstallSnapshot: stale term"
1177            );
1178            return Ok(InstallSnapshotResponse::new(persistent.current_term));
1179        }
1180
1181        // Update heartbeat and leader
1182        *self.last_heartbeat.write() = Instant::now();
1183        volatile.become_follower(Some(req.leader_id));
1184        *self.candidate_state.write() = None;
1185
1186        let current_term = persistent.current_term;
1187        drop(persistent);
1188        drop(volatile);
1189
1190        // Handle chunked snapshot transfer
1191        let mut receiver_guard = self.snapshot_receiver.write();
1192
1193        // If this is a new snapshot transfer (offset 0), create a new receiver
1194        if req.offset == 0 {
1195            *receiver_guard = Some(SnapshotReceiver::new(
1196                req.last_included_index,
1197                req.last_included_term,
1198            ));
1199        }
1200
1201        let receiver = match receiver_guard.as_mut() {
1202            Some(r) => r,
1203            None => {
1204                // No active receiver and offset != 0 - this is unexpected
1205                warn!(
1206                    node_id = self.node_id(),
1207                    offset = req.offset,
1208                    "Received non-initial snapshot chunk without active receiver"
1209                );
1210                return Ok(InstallSnapshotResponse::new(current_term));
1211            }
1212        };
1213
1214        // Feed the chunk to the receiver
1215        let completed = receiver.receive_chunk(&req)?;
1216
1217        if let Some(snapshot) = completed {
1218            // Clear receiver
1219            *receiver_guard = None;
1220            drop(receiver_guard);
1221
1222            // Install the snapshot
1223            let mut snap_guard = self.snapshot_manager.write();
1224            if let Some(manager) = snap_guard.as_mut() {
1225                manager.install_snapshot(snapshot)?;
1226            }
1227
1228            // Reset the log to match the snapshot
1229            let mut log = self.log.write();
1230            log.install_snapshot(req.last_included_index, req.last_included_term);
1231
1232            info!(
1233                node_id = self.node_id(),
1234                last_included_index = req.last_included_index,
1235                last_included_term = req.last_included_term,
1236                "Installed snapshot from leader"
1237            );
1238        }
1239
1240        Ok(InstallSnapshotResponse::new(current_term))
1241    }
1242
1243    /// Prepare an InstallSnapshot request for a follower that is too far behind
1244    ///
1245    /// This is called by the leader when a follower's next_index falls behind
1246    /// the snapshot point and log entries are no longer available.
1247    pub fn prepare_install_snapshot(
1248        &self,
1249        target_peer: NodeId,
1250    ) -> RaftResult<Option<InstallSnapshotRequest>> {
1251        let volatile = self.volatile.read();
1252        if !volatile.is_leader() {
1253            return Ok(None);
1254        }
1255        drop(volatile);
1256
1257        let snap_guard = self.snapshot_manager.read();
1258        let manager = match snap_guard.as_ref() {
1259            Some(m) => m,
1260            None => return Ok(None),
1261        };
1262
1263        let snapshot = match manager.load_latest()? {
1264            Some(s) => s,
1265            None => return Ok(None),
1266        };
1267
1268        // Check if the peer actually needs a snapshot
1269        let leader_state_guard = self.leader_state.read();
1270        if let Some(leader_state) = leader_state_guard.as_ref() {
1271            let next_index = leader_state.get_next_index(target_peer);
1272            let log = self.log.read();
1273            let (snap_idx, _) = log.get_snapshot_point();
1274
1275            if next_index > snap_idx {
1276                // Peer doesn't need a snapshot, normal replication will work
1277                return Ok(None);
1278            }
1279        }
1280
1281        let term = self.current_term();
1282        let req = InstallSnapshotRequest::new_complete(
1283            term,
1284            self.node_id(),
1285            snapshot.metadata.last_included_index,
1286            snapshot.metadata.last_included_term,
1287            snapshot.data,
1288        );
1289
1290        Ok(Some(req))
1291    }
1292
1293    /// Check if a follower needs a snapshot instead of normal log replication
1294    ///
1295    /// Returns true if the follower's next_index is at or before the snapshot point.
1296    pub fn follower_needs_snapshot(&self, peer: NodeId) -> bool {
1297        let leader_state_guard = self.leader_state.read();
1298        let leader_state = match leader_state_guard.as_ref() {
1299            Some(s) => s,
1300            None => return false,
1301        };
1302
1303        let next_index = leader_state.get_next_index(peer);
1304        let log = self.log.read();
1305        let (snap_idx, _) = log.get_snapshot_point();
1306
1307        snap_idx > 0 && next_index <= snap_idx
1308    }
1309
1310    // ── Membership change (joint consensus) ──────────────────────────
1311
1312    /// Add a node to the cluster via joint consensus.
1313    ///
1314    /// If this node is the leader the change is proposed immediately.
1315    /// Returns an error if a membership change is already in progress or
1316    /// the node is already a member.
1317    pub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()> {
1318        self.propose_membership_change(MembershipChange::AddNode { node_id, address })
1319    }
1320
1321    /// Remove a node from the cluster via joint consensus.
1322    ///
1323    /// If the removed node is this node, it will step down after the
1324    /// configuration change commits.
1325    pub fn remove_node(&self, node_id: NodeId) -> RaftResult<()> {
1326        self.propose_membership_change(MembershipChange::RemoveNode { node_id })
1327    }
1328
1329    /// Get the current list of cluster members as `(node_id, address)` pairs.
1330    pub fn cluster_members(&self) -> Vec<(NodeId, String)> {
1331        self.config_state.read().all_members()
1332    }
1333
1334    /// Check whether the cluster is currently in joint consensus.
1335    pub fn is_in_joint_consensus(&self) -> bool {
1336        self.config_state.read().is_joint()
1337    }
1338
1339    /// Get the current membership configuration version.
1340    pub fn membership_version(&self) -> u64 {
1341        self.config_state.read().version()
1342    }
1343
1344    /// Propose a membership change (leader only).
1345    ///
1346    /// Implements the simplified Raft joint consensus protocol (Section 6):
1347    /// 1. Leader creates joint config C_{old,new} and replicates it.
1348    /// 2. Once C_{old,new} is committed the leader creates C_{new}.
1349    /// 3. During the joint state, decisions require a majority of **both**
1350    ///    the old and new configurations.
1351    pub fn propose_membership_change(&self, change: MembershipChange) -> RaftResult<()> {
1352        // Must be leader
1353        let volatile = self.volatile.read();
1354        if !volatile.is_leader() {
1355            return Err(RaftError::NotLeader {
1356                leader_id: volatile.leader_id,
1357            });
1358        }
1359        drop(volatile);
1360
1361        let mut cs = self.config_state.write();
1362
1363        // Only one membership change at a time
1364        if cs.is_joint() {
1365            return Err(RaftError::MembershipChangeInProgress);
1366        }
1367
1368        let current = match &*cs {
1369            ConfigState::Stable(c) => c.clone(),
1370            ConfigState::Joint { .. } => return Err(RaftError::MembershipChangeInProgress),
1371        };
1372
1373        // Build the new config
1374        let new_config = match &change {
1375            MembershipChange::AddNode { node_id, address } => {
1376                if current.contains(*node_id) {
1377                    return Err(RaftError::NodeAlreadyMember { node_id: *node_id });
1378                }
1379                current.with_added_member(*node_id, address.clone())
1380            }
1381            MembershipChange::RemoveNode { node_id } => {
1382                if !current.contains(*node_id) {
1383                    return Err(RaftError::NodeNotMember { node_id: *node_id });
1384                }
1385                current.without_member(*node_id)
1386            }
1387        };
1388
1389        // Enter joint consensus C_{old,new}
1390        *cs = ConfigState::Joint {
1391            old: current.clone(),
1392            new: new_config.clone(),
1393        };
1394
1395        info!(
1396            node_id = self.node_id(),
1397            change = ?change,
1398            old_version = current.version(),
1399            new_version = new_config.version(),
1400            "Entered joint consensus"
1401        );
1402
1403        // Update the peers list and leader state to include the union of
1404        // both configs so replication / heartbeats reach the new member.
1405        self.update_leader_state_for_config(&cs);
1406
1407        // Append a joint-config marker to the log so followers replicate it.
1408        let term = self.current_term();
1409        let mut log = self.log.write();
1410        let _index = log.append(term, Command::from_str("__membership_joint__"));
1411
1412        Ok(())
1413    }
1414
1415    /// Commit the joint consensus transition: move from C_{old,new} to C_{new}.
1416    ///
1417    /// Call this once the joint config entry has been committed (i.e. acknowledged
1418    /// by a quorum of **both** old and new configs).
1419    pub fn commit_membership_change(&self) -> RaftResult<()> {
1420        let mut cs = self.config_state.write();
1421        let new_config = match &*cs {
1422            ConfigState::Joint { new, .. } => new.clone(),
1423            ConfigState::Stable(_) => {
1424                // Nothing to do -- already stable
1425                return Ok(());
1426            }
1427        };
1428
1429        *cs = ConfigState::Stable(new_config.clone());
1430
1431        info!(
1432            node_id = self.node_id(),
1433            version = new_config.version(),
1434            members = ?new_config.member_ids(),
1435            "Committed membership change, now stable"
1436        );
1437
1438        // Update leader state to reflect the final config
1439        self.update_leader_state_for_config(&cs);
1440
1441        // Append a stable-config marker so followers learn about it
1442        let term = self.current_term();
1443        let mut log = self.log.write();
1444        let _index = log.append(term, Command::from_str("__membership_stable__"));
1445
1446        // If we (the leader) were removed, step down
1447        if !new_config.contains(self.node_id()) {
1448            drop(cs);
1449            drop(log);
1450            self.step_down();
1451        }
1452
1453        Ok(())
1454    }
1455
1456    /// Calculate whether a set of nodes forms a quorum under the current
1457    /// membership config (handles both stable and joint states).
1458    pub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool {
1459        self.config_state.read().has_quorum(responding_nodes)
1460    }
1461
1462    /// Check if this node is stepping down (has been removed from the cluster).
1463    pub fn is_stepping_down(&self) -> bool {
1464        *self.stepping_down.read()
1465    }
1466
1467    /// Gracefully step down: revert to follower and mark as stepping down.
1468    fn step_down(&self) {
1469        let mut volatile = self.volatile.write();
1470        volatile.become_follower(None);
1471        *self.leader_state.write() = None;
1472        *self.candidate_state.write() = None;
1473        *self.stepping_down.write() = true;
1474
1475        info!(
1476            node_id = self.node_id(),
1477            "Stepping down -- removed from cluster"
1478        );
1479    }
1480
1481    /// Synchronize leader replication state with the current config so
1482    /// new members receive heartbeats / log entries.
1483    fn update_leader_state_for_config(&self, cs: &ConfigState) {
1484        let mut ls_guard = self.leader_state.write();
1485        let ls = match ls_guard.as_mut() {
1486            Some(s) => s,
1487            None => return,
1488        };
1489
1490        let all_ids = cs.all_member_ids();
1491        let log = self.log.read();
1492        let last_log_index = log.last_index();
1493
1494        // Add entries for any new peers that are not yet tracked
1495        for &id in &all_ids {
1496            if id == self.node_id() {
1497                continue;
1498            }
1499            ls.next_index.entry(id).or_insert(last_log_index + 1);
1500            ls.match_index.entry(id).or_insert(0);
1501        }
1502
1503        // Remove entries for peers that are no longer in any config
1504        ls.next_index
1505            .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1506        ls.match_index
1507            .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1508    }
1509
1510    /// Check if election timeout has elapsed
1511    pub fn election_timeout_elapsed(&self) -> bool {
1512        let last_heartbeat = *self.last_heartbeat.read();
1513        let timeout = self.config.random_election_timeout();
1514        last_heartbeat.elapsed() >= timeout
1515    }
1516
1517    /// Reset election timer
1518    pub fn reset_election_timer(&self) {
1519        *self.last_heartbeat.write() = Instant::now();
1520    }
1521
1522    /// Get a hint about the current leader (for client redirection)
1523    pub fn get_leader_hint(&self) -> Option<NodeId> {
1524        self.volatile.read().leader_id
1525    }
1526
1527    /// Trigger a failover election if this node is a follower.
1528    /// Returns the vote requests to send to peers, or an empty vec
1529    /// if the node is not in follower state.
1530    pub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest> {
1531        let state = self.volatile.read().node_state;
1532        if state != NodeState::Follower {
1533            return Vec::new();
1534        }
1535        self.start_election()
1536    }
1537}
1538
1539// ---------------------------------------------------------------------------
1540// WAL replay helper
1541// ---------------------------------------------------------------------------
1542
1543/// Replay WAL entries from `wal_dir` into `log`, merging with any entries
1544/// already present.
1545///
1546/// Strategy: WAL entries with indices greater than the current `log.last_index()`
1547/// are appended verbatim.  Entries at or below the current last index are
1548/// skipped (persistence already covers them, and WAL is treated as a
1549/// superset or equal set).  If the WAL has a higher-index entry that
1550/// conflicts in term, the WAL version wins (WAL is more recent).
1551///
1552/// Uses [`CorruptionPolicy::TruncateToLastGood`] for crash safety (partial
1553/// final entries are silently discarded).
1554fn replay_wal_into_log(wal_dir: &std::path::Path, log: &mut RaftLog) -> RaftResult<()> {
1555    let reader = WalReader::new(wal_dir);
1556    let (wal_entries, diag) = reader.recover_with_policy(CorruptionPolicy::TruncateToLastGood)?;
1557
1558    if diag.corrupt_entries > 0 || diag.truncated_segments > 0 {
1559        warn!(
1560            corrupt_entries = diag.corrupt_entries,
1561            truncated_segments = diag.truncated_segments,
1562            valid_entries = diag.valid_entries,
1563            "WAL replay: corruption/truncation detected"
1564        );
1565    }
1566
1567    if wal_entries.is_empty() {
1568        info!(wal_dir = %wal_dir.display(), "WAL replay: no entries to recover");
1569        return Ok(());
1570    }
1571
1572    let current_last = log.last_index();
1573    let new_entries: Vec<LogEntry> = wal_entries
1574        .into_iter()
1575        .filter(|e| e.index > current_last)
1576        .collect();
1577
1578    if new_entries.is_empty() {
1579        info!(
1580            wal_dir = %wal_dir.display(),
1581            current_last,
1582            "WAL replay: all WAL entries already present in log"
1583        );
1584        return Ok(());
1585    }
1586
1587    let replayed_count = new_entries.len();
1588    let first_new = new_entries[0].index;
1589    let last_new = new_entries[new_entries.len() - 1].index;
1590
1591    log.append_entries(new_entries)?;
1592
1593    info!(
1594        wal_dir = %wal_dir.display(),
1595        replayed_count,
1596        first_new,
1597        last_new,
1598        new_last_index = log.last_index(),
1599        "WAL replay complete"
1600    );
1601
1602    Ok(())
1603}
1604
1605#[cfg(test)]
1606#[path = "node_tests.rs"]
1607mod tests;