Skip to main content

amaters_cluster/
node.rs

1//! Main Raft node implementation
2
3use crate::config::DynamicConfig;
4use crate::error::{RaftError, RaftResult};
5use crate::failover::{AlertEvent, AlertManager, FailoverConfig, FailoverCoordinator};
6use crate::log::{Command, LogEntry, RaftLog, StateMachine};
7use crate::persistence::{FilePersistence, RaftPersistence};
8use crate::rpc::{
9    AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
10};
11use crate::snapshot::{
12    InstallSnapshotRequest, InstallSnapshotResponse, Snapshot, SnapshotConfig, SnapshotManager,
13    SnapshotPolicy, SnapshotReceiver, SnapshotStreamReceiver,
14};
15use crate::state::FencingTokenState;
16use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
17use crate::types::{
18    ClusterConfig, ConfigState, FencingToken, HeartbeatConfig, LogIndex, MembershipChange, NodeId,
19    NodeState, RaftConfig, Term,
20};
21use crate::wal::{CorruptionPolicy, WalReader};
22use parking_lot::RwLock;
23use std::collections::HashSet;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29/// A Raft consensus node
30pub struct RaftNode {
31    /// Node configuration
32    config: Arc<RaftConfig>,
33    /// Persistent state
34    persistent: Arc<RwLock<PersistentState>>,
35    /// Volatile state
36    volatile: Arc<RwLock<VolatileState>>,
37    /// Raft log
38    log: Arc<RwLock<RaftLog>>,
39    /// Leader-specific state
40    leader_state: Arc<RwLock<Option<LeaderState>>>,
41    /// Candidate-specific state
42    candidate_state: Arc<RwLock<Option<CandidateState>>>,
43    /// Last time we received a message from the leader
44    last_heartbeat: Arc<RwLock<Instant>>,
45    /// Snapshot manager for creating and loading snapshots
46    snapshot_manager: Arc<RwLock<Option<SnapshotManager>>>,
47    /// Receiver for chunked snapshot transfers from the leader
48    snapshot_receiver: Arc<RwLock<Option<SnapshotReceiver>>>,
49    /// Optional persistent storage backend
50    persistence: Option<Arc<dyn RaftPersistence>>,
51    /// Dynamic cluster membership state (joint consensus)
52    config_state: Arc<RwLock<ConfigState>>,
53    /// Whether this node has been removed and should step down
54    stepping_down: Arc<RwLock<bool>>,
55    /// Packed fencing token state (atomic, lock-free reads)
56    fencing_token_state: Arc<FencingTokenState>,
57    /// True while WAL replay is in progress; RPCs are rejected during this window
58    is_recovering: Arc<AtomicBool>,
59    /// Active snapshot streamers for outbound chunked snapshot transfers.
60    ///
61    /// Keyed by target peer `NodeId`. Entries are created the first time a
62    /// large snapshot transfer is initiated for a peer and removed when the
63    /// final chunk is delivered or when this node steps down.
64    snapshot_streamers:
65        Arc<RwLock<std::collections::HashMap<NodeId, crate::snapshot::SnapshotStreamer>>>,
66    /// Per-sender streaming snapshot receivers for inbound chunked snapshot transfers.
67    ///
68    /// Keyed by sender `NodeId`. Entries are created on the first chunk from a sender
69    /// and removed on completion or error. Cleared when node steps down.
70    snapshot_stream_receivers:
71        Arc<RwLock<std::collections::HashMap<NodeId, SnapshotStreamReceiver>>>,
72    /// Hot-reloadable configuration subset.
73    ///
74    /// The Raft event loop reads `heartbeat_interval_ms` and
75    /// `compaction_threshold` from this value so that operator changes take
76    /// effect on the next tick without a full restart.
77    ///
78    /// Fields that require restart (bind_addr, node_id, peers, …) are NOT
79    /// present here; see [`crate::config::NodeConfig`] for documentation on
80    /// which fields fall into each category.
81    pub dynamic_config: Arc<RwLock<DynamicConfig>>,
82    /// Failover coordinator for leader failure detection and client redirects.
83    ///
84    /// Tracks which node is the current leader and detects leader failures via
85    /// heartbeat timeout.  The coordinator is seeded with the cluster peers
86    /// at construction time.
87    pub failover_coordinator: Arc<RwLock<FailoverCoordinator>>,
88    /// Optional placement scheduler handle.
89    ///
90    /// Populated via [`attach_placement_scheduler`][Self::attach_placement_scheduler].
91    /// Cleared (and the scheduler stopped) whenever the node steps down.
92    placement_scheduler_handle:
93        Arc<RwLock<Option<crate::placement_scheduler::PlacementSchedulerHandle>>>,
94    /// Optional state machine driven by committed log entries.
95    ///
96    /// Set via [`set_state_machine`][Self::set_state_machine].  Entries are
97    /// applied automatically after the commit index advances.
98    state_machine: Arc<parking_lot::Mutex<Option<Box<dyn StateMachine>>>>,
99    /// Optional alerting manager for cluster events (leader change, quorum
100    /// loss, slow replication, …).
101    ///
102    /// Set via [`set_alert_manager`][Self::set_alert_manager].
103    alert_manager: Option<Arc<AlertManager>>,
104}
105
106impl RaftNode {
107    /// Create a new Raft node
108    pub fn new(config: RaftConfig) -> RaftResult<Self> {
109        // Validate configuration
110        config
111            .validate()
112            .map_err(|msg| RaftError::ConfigError { message: msg })?;
113
114        // Initialize snapshot manager if snapshot directory is configured
115        let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
116            let snap_config =
117                SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
118            Some(SnapshotManager::new(snap_config)?)
119        } else {
120            None
121        };
122
123        // Initialize persistence backend if persistence_dir is configured
124        let persistence: Option<Arc<dyn RaftPersistence>> =
125            if let Some(ref dir) = config.persistence_dir {
126                Some(Arc::new(FilePersistence::new(dir, config.sync_on_write)?))
127            } else {
128                None
129            };
130
131        // If persistence is available, recover state from disk
132        let (persistent_state, mut raft_log) = if let Some(ref p) = persistence {
133            let (term, voted_for) = p.load_state()?;
134            let mut ps = PersistentState::new();
135            ps.current_term = term;
136            ps.voted_for = voted_for;
137
138            let entries = p.load_log()?;
139            let mut log = RaftLog::new();
140            if !entries.is_empty() {
141                log.append_entries(entries)?;
142            }
143
144            // Restore applied_index from persistence
145            let applied_idx = p.load_applied_index()?;
146            if applied_idx > 0 && applied_idx <= log.last_index() {
147                // Restore commit_index to applied_index (safe lower bound on recovery)
148                if let Err(e) = log.set_commit_index(applied_idx) {
149                    warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
150                } else if let Err(e) = log.set_applied_index(applied_idx) {
151                    warn!(applied_idx, error = ?e, "Failed to restore applied index");
152                }
153            }
154
155            info!(
156                node_id = config.node_id,
157                term = term,
158                voted_for = ?voted_for,
159                last_log_index = log.last_index(),
160                "Recovered state from persistence"
161            );
162
163            (ps, log)
164        } else {
165            (PersistentState::new(), RaftLog::new())
166        };
167
168        // Replay WAL entries if wal_dir is configured.
169        // The `is_recovering` flag is set to true for the duration of replay
170        // so that any concurrent RPC handlers can reject requests gracefully.
171        let is_recovering = Arc::new(AtomicBool::new(false));
172        if let Some(ref wal_dir) = config.wal_dir {
173            is_recovering.store(true, Ordering::Release);
174            let result = replay_wal_into_log(wal_dir, &mut raft_log);
175            is_recovering.store(false, Ordering::Release);
176            result?;
177        }
178
179        // Build initial stable config from peers (using empty addresses for now)
180        let initial_members: Vec<(NodeId, String)> =
181            config.peers.iter().map(|&id| (id, String::new())).collect();
182        let config_state = ConfigState::new_stable(initial_members);
183
184        // Build failover coordinator seeded with all peers.
185        let hb_interval_ms = config.heartbeat_interval;
186        let failover_coordinator = {
187            let hb_cfg = HeartbeatConfig::new(hb_interval_ms, hb_interval_ms * 10, 3);
188            let mut coord =
189                FailoverCoordinator::new(hb_cfg, FailoverConfig::default(), config.node_id);
190            for &peer in config.peers.iter().filter(|&&p| p != config.node_id) {
191                // Ignore errors — new node may not be tracked yet
192                let _ = coord.track_peer(peer);
193            }
194            Arc::new(RwLock::new(coord))
195        };
196
197        Ok(Self {
198            config: Arc::new(config),
199            persistent: Arc::new(RwLock::new(persistent_state)),
200            volatile: Arc::new(RwLock::new(VolatileState::new())),
201            log: Arc::new(RwLock::new(raft_log)),
202            leader_state: Arc::new(RwLock::new(None)),
203            candidate_state: Arc::new(RwLock::new(None)),
204            last_heartbeat: Arc::new(RwLock::new(Instant::now())),
205            snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
206            snapshot_receiver: Arc::new(RwLock::new(None)),
207            snapshot_streamers: Arc::new(RwLock::new(std::collections::HashMap::new())),
208            snapshot_stream_receivers: Arc::new(RwLock::new(std::collections::HashMap::new())),
209            persistence,
210            config_state: Arc::new(RwLock::new(config_state)),
211            stepping_down: Arc::new(RwLock::new(false)),
212            fencing_token_state: Arc::new(FencingTokenState::new()),
213            is_recovering,
214            dynamic_config: Arc::new(RwLock::new(DynamicConfig {
215                heartbeat_interval_ms: hb_interval_ms,
216                compaction_threshold: 10_000,
217            })),
218            failover_coordinator,
219            placement_scheduler_handle: Arc::new(RwLock::new(None)),
220            state_machine: Arc::new(parking_lot::Mutex::new(None)),
221            alert_manager: None,
222        })
223    }
224
225    /// Create a new Raft node with an explicit persistence backend.
226    ///
227    /// Recovers state from the given persistence backend and uses it for all
228    /// subsequent state and log mutations.
229    pub fn with_persistence(
230        config: RaftConfig,
231        persistence: Arc<dyn RaftPersistence>,
232    ) -> RaftResult<Self> {
233        config
234            .validate()
235            .map_err(|msg| RaftError::ConfigError { message: msg })?;
236
237        let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
238            let snap_config =
239                SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
240            Some(SnapshotManager::new(snap_config)?)
241        } else {
242            None
243        };
244
245        let (term, voted_for) = persistence.load_state()?;
246        let mut ps = PersistentState::new();
247        ps.current_term = term;
248        ps.voted_for = voted_for;
249
250        let entries = persistence.load_log()?;
251        let mut raft_log = RaftLog::new();
252        if !entries.is_empty() {
253            raft_log.append_entries(entries)?;
254        }
255
256        // Restore applied_index from persistence
257        let applied_idx = persistence.load_applied_index()?;
258        if applied_idx > 0 && applied_idx <= raft_log.last_index() {
259            if let Err(e) = raft_log.set_commit_index(applied_idx) {
260                warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
261            } else if let Err(e) = raft_log.set_applied_index(applied_idx) {
262                warn!(applied_idx, error = ?e, "Failed to restore applied index");
263            }
264        }
265
266        info!(
267            node_id = config.node_id,
268            term = term,
269            voted_for = ?voted_for,
270            last_log_index = raft_log.last_index(),
271            "Recovered state via explicit persistence"
272        );
273
274        // Replay WAL entries if wal_dir is configured.
275        let is_recovering = Arc::new(AtomicBool::new(false));
276        if let Some(ref wal_dir) = config.wal_dir {
277            is_recovering.store(true, Ordering::Release);
278            let result = replay_wal_into_log(wal_dir, &mut raft_log);
279            is_recovering.store(false, Ordering::Release);
280            result?;
281        }
282
283        let initial_members: Vec<(NodeId, String)> =
284            config.peers.iter().map(|&id| (id, String::new())).collect();
285        let config_state = ConfigState::new_stable(initial_members);
286
287        // Build failover coordinator seeded with all peers.
288        let hb_interval_ms_wp = config.heartbeat_interval;
289        let failover_coordinator_wp = {
290            let hb_cfg = HeartbeatConfig::new(hb_interval_ms_wp, hb_interval_ms_wp * 10, 3);
291            let mut coord =
292                FailoverCoordinator::new(hb_cfg, FailoverConfig::default(), config.node_id);
293            for &peer in config.peers.iter().filter(|&&p| p != config.node_id) {
294                let _ = coord.track_peer(peer);
295            }
296            Arc::new(RwLock::new(coord))
297        };
298
299        Ok(Self {
300            config: Arc::new(config),
301            persistent: Arc::new(RwLock::new(ps)),
302            volatile: Arc::new(RwLock::new(VolatileState::new())),
303            log: Arc::new(RwLock::new(raft_log)),
304            leader_state: Arc::new(RwLock::new(None)),
305            candidate_state: Arc::new(RwLock::new(None)),
306            last_heartbeat: Arc::new(RwLock::new(Instant::now())),
307            snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
308            snapshot_receiver: Arc::new(RwLock::new(None)),
309            snapshot_streamers: Arc::new(RwLock::new(std::collections::HashMap::new())),
310            snapshot_stream_receivers: Arc::new(RwLock::new(std::collections::HashMap::new())),
311            persistence: Some(persistence),
312            config_state: Arc::new(RwLock::new(config_state)),
313            stepping_down: Arc::new(RwLock::new(false)),
314            fencing_token_state: Arc::new(FencingTokenState::new()),
315            is_recovering,
316            dynamic_config: Arc::new(RwLock::new(DynamicConfig {
317                heartbeat_interval_ms: hb_interval_ms_wp,
318                compaction_threshold: 10_000,
319            })),
320            failover_coordinator: failover_coordinator_wp,
321            placement_scheduler_handle: Arc::new(RwLock::new(None)),
322            state_machine: Arc::new(parking_lot::Mutex::new(None)),
323            alert_manager: None,
324        })
325    }
326
327    /// Persist current term and voted_for to the storage backend (no-op if
328    /// persistence is not configured).
329    fn persist_state(&self, term: Term, voted_for: Option<NodeId>) {
330        if let Some(ref p) = self.persistence {
331            if let Err(e) = p.save_state(term, voted_for) {
332                warn!(node_id = self.node_id(), error = ?e, "Failed to persist state");
333            }
334        }
335    }
336
337    /// Persist log entries to the storage backend.
338    fn persist_log_entries(&self, entries: &[LogEntry]) {
339        if let Some(ref p) = self.persistence {
340            if let Err(e) = p.append_entries(entries) {
341                warn!(node_id = self.node_id(), error = ?e, "Failed to persist log entries");
342            }
343        }
344    }
345
346    /// Persist a log truncation to the storage backend.
347    fn persist_log_truncation(&self, from_index: LogIndex) {
348        if let Some(ref p) = self.persistence {
349            if let Err(e) = p.truncate_log_from(from_index) {
350                warn!(node_id = self.node_id(), error = ?e, "Failed to persist log truncation");
351            }
352        }
353    }
354
355    /// Get the current node ID
356    pub fn node_id(&self) -> NodeId {
357        self.config.node_id
358    }
359
360    /// Get the current term
361    pub fn current_term(&self) -> Term {
362        self.persistent.read().current_term
363    }
364
365    /// Get the current state
366    pub fn state(&self) -> NodeState {
367        self.volatile.read().node_state
368    }
369
370    /// Get the current leader ID
371    pub fn leader_id(&self) -> Option<NodeId> {
372        self.volatile.read().leader_id
373    }
374
375    /// Check if this node is the leader
376    pub fn is_leader(&self) -> bool {
377        self.volatile.read().is_leader()
378    }
379
380    /// Get the commit index
381    pub fn commit_index(&self) -> LogIndex {
382        self.log.read().commit_index()
383    }
384
385    /// Get the last log index
386    pub fn last_log_index(&self) -> LogIndex {
387        self.log.read().last_index()
388    }
389
390    /// Append a command to the log (leader only)
391    pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
392        let volatile = self.volatile.read();
393        if !volatile.is_leader() {
394            return Err(RaftError::NotLeader {
395                leader_id: volatile.leader_id,
396            });
397        }
398        drop(volatile);
399
400        let term = self.current_term();
401        let mut log = self.log.write();
402        let index = log.append(term, command.clone());
403
404        // Persist the new entry
405        let entry = LogEntry::new(term, index, command);
406        self.persist_log_entries(&[entry]);
407
408        info!(
409            node_id = self.node_id(),
410            index = index,
411            term = term,
412            "Proposed new entry"
413        );
414
415        Ok(index)
416    }
417
418    /// Return `true` if the node is currently replaying its WAL on startup.
419    pub fn is_recovering(&self) -> bool {
420        self.is_recovering.load(Ordering::Acquire)
421    }
422
423    /// Handle a RequestVote RPC
424    pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
425        // Reject all RPCs during WAL replay to maintain safety.
426        if self.is_recovering.load(Ordering::Acquire) {
427            warn!(
428                node_id = self.node_id(),
429                candidate = req.candidate_id,
430                event = "rpc_rejected_recovering",
431                "Rejecting RequestVote: node is recovering from WAL"
432            );
433            return RequestVoteResponse::rejected(self.current_term());
434        }
435
436        let mut persistent = self.persistent.write();
437        let mut volatile = self.volatile.write();
438
439        debug!(
440            node_id = self.node_id(),
441            candidate = req.candidate_id,
442            term = req.term,
443            "Received RequestVote"
444        );
445
446        // Update term if necessary
447        if req.term > persistent.current_term {
448            let from_term = persistent.current_term;
449            persistent.update_term(req.term);
450            self.persist_state(persistent.current_term, persistent.voted_for);
451            volatile.become_follower(None);
452            *self.leader_state.write() = None;
453            *self.candidate_state.write() = None;
454            crate::metrics::global().set_current_term(persistent.current_term);
455            debug!(
456                node_id = self.node_id(),
457                from_term = from_term,
458                to_term = persistent.current_term,
459                "Stepped down to follower (higher term in RequestVote)"
460            );
461        }
462
463        // Reject if term is stale
464        if req.term < persistent.current_term {
465            warn!(
466                node_id = self.node_id(),
467                candidate = req.candidate_id,
468                current_term = persistent.current_term,
469                request_term = req.term,
470                "Rejecting vote: stale term"
471            );
472            return RequestVoteResponse::rejected(persistent.current_term);
473        }
474
475        // Check if we've already voted
476        if let Some(voted_for) = persistent.voted_for {
477            if voted_for != req.candidate_id {
478                warn!(
479                    node_id = self.node_id(),
480                    candidate = req.candidate_id,
481                    voted_for = voted_for,
482                    "Rejecting vote: already voted"
483                );
484                return RequestVoteResponse::rejected(persistent.current_term);
485            }
486        }
487
488        // Check if candidate's log is at least as up-to-date as ours
489        let log = self.log.read();
490        let our_last_index = log.last_index();
491        let our_last_term = log.last_term();
492
493        let log_ok = req.last_log_term > our_last_term
494            || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
495
496        if !log_ok {
497            warn!(
498                node_id = self.node_id(),
499                candidate = req.candidate_id,
500                our_last_index = our_last_index,
501                our_last_term = our_last_term,
502                candidate_last_index = req.last_log_index,
503                candidate_last_term = req.last_log_term,
504                "Rejecting vote: candidate log not up-to-date"
505            );
506            return RequestVoteResponse::rejected(persistent.current_term);
507        }
508
509        // Grant vote
510        persistent.grant_vote(req.candidate_id);
511        self.persist_state(persistent.current_term, persistent.voted_for);
512        *self.last_heartbeat.write() = Instant::now();
513
514        info!(
515            node_id = self.node_id(),
516            candidate = req.candidate_id,
517            term = req.term,
518            "Granted vote"
519        );
520
521        RequestVoteResponse::granted(persistent.current_term)
522    }
523
524    /// Handle an AppendEntries RPC
525    pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
526        // Reject all RPCs during WAL replay to maintain safety.
527        if self.is_recovering.load(Ordering::Acquire) {
528            warn!(
529                node_id = self.node_id(),
530                leader = req.leader_id,
531                event = "rpc_rejected_recovering",
532                "Rejecting AppendEntries: node is recovering from WAL"
533            );
534            return AppendEntriesResponse::rejected(self.current_term());
535        }
536
537        let mut persistent = self.persistent.write();
538        let mut volatile = self.volatile.write();
539
540        debug!(
541            node_id = self.node_id(),
542            leader = req.leader_id,
543            term = req.term,
544            entries = req.entries.len(),
545            "Received AppendEntries"
546        );
547
548        // Update term if necessary
549        if req.term > persistent.current_term {
550            let from_term = persistent.current_term;
551            persistent.update_term(req.term);
552            self.persist_state(persistent.current_term, persistent.voted_for);
553            volatile.become_follower(Some(req.leader_id));
554            *self.leader_state.write() = None;
555            *self.candidate_state.write() = None;
556            crate::metrics::global().set_current_term(persistent.current_term);
557            debug!(
558                node_id = self.node_id(),
559                from_term = from_term,
560                to_term = persistent.current_term,
561                leader_id = req.leader_id,
562                "Stepped down to follower (higher term in AppendEntries)"
563            );
564        }
565
566        // Reject if term is stale
567        if req.term < persistent.current_term {
568            warn!(
569                node_id = self.node_id(),
570                leader = req.leader_id,
571                current_term = persistent.current_term,
572                request_term = req.term,
573                "Rejecting AppendEntries: stale term"
574            );
575            return AppendEntriesResponse::rejected(persistent.current_term);
576        }
577
578        // Update heartbeat and leader
579        *self.last_heartbeat.write() = Instant::now();
580        volatile.become_follower(Some(req.leader_id));
581        *self.candidate_state.write() = None;
582
583        // Inform the failover coordinator of the current leader so it can
584        // redirect clients and reset its election timer.
585        self.failover_coordinator.write().set_leader(req.leader_id);
586
587        drop(persistent);
588        drop(volatile);
589
590        // Handle the entries
591        let mut log = self.log.write();
592        let our_last_index = log.last_index();
593
594        // Check if we have the previous log entry
595        if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
596            // Find conflict index and term
597            let conflict_index = req.prev_log_index.min(our_last_index);
598            let conflict_term = log.get_term(conflict_index).unwrap_or(0);
599
600            warn!(
601                node_id = self.node_id(),
602                prev_log_index = req.prev_log_index,
603                prev_log_term = req.prev_log_term,
604                conflict_index = conflict_index,
605                conflict_term = conflict_term,
606                "Rejecting AppendEntries: log inconsistency"
607            );
608
609            return AppendEntriesResponse::failure(
610                self.current_term(),
611                our_last_index,
612                conflict_index,
613                conflict_term,
614            );
615        }
616
617        // Append entries if any
618        if !req.entries.is_empty() {
619            // Delete conflicting entries
620            let first_new_index = req.entries[0].index;
621            if first_new_index <= our_last_index {
622                if let Err(e) = log.truncate_from(first_new_index) {
623                    warn!(
624                        node_id = self.node_id(),
625                        error = ?e,
626                        "Failed to truncate log"
627                    );
628                    return AppendEntriesResponse::rejected(self.current_term());
629                }
630                self.persist_log_truncation(first_new_index);
631            }
632
633            // Persist before in-memory append
634            self.persist_log_entries(&req.entries);
635
636            // Append new entries
637            if let Err(e) = log.append_entries(req.entries) {
638                warn!(
639                    node_id = self.node_id(),
640                    error = ?e,
641                    "Failed to append entries"
642                );
643                return AppendEntriesResponse::rejected(self.current_term());
644            }
645        }
646
647        // Update commit index
648        if req.leader_commit > log.commit_index() {
649            let old_commit = log.commit_index();
650            let new_commit = req.leader_commit.min(log.last_index());
651            if let Err(e) = log.set_commit_index(new_commit) {
652                warn!(
653                    node_id = self.node_id(),
654                    error = ?e,
655                    "Failed to update commit index"
656                );
657            } else {
658                crate::metrics::global().set_commit_index(new_commit);
659                crate::metrics::global().set_log_entry_count(log.last_index());
660                debug!(
661                    node_id = self.node_id(),
662                    old_commit_index = old_commit,
663                    new_commit_index = new_commit,
664                    "Updated commit index"
665                );
666                // Release the log write-lock before applying entries so that
667                // the state machine can call back into the node without deadlock.
668                drop(log);
669                if let Err(e) = self.apply_committed_entries() {
670                    warn!(node_id = self.node_id(), error = ?e, "Failed to apply committed entries");
671                }
672                return AppendEntriesResponse::success(self.current_term(), self.last_log_index());
673            }
674        }
675
676        AppendEntriesResponse::success(self.current_term(), log.last_index())
677    }
678
679    /// Start an election (transition to candidate)
680    pub fn start_election(&self) -> Vec<RequestVoteRequest> {
681        let mut persistent = self.persistent.write();
682        let mut volatile = self.volatile.write();
683
684        // Increment term and vote for self
685        persistent.current_term += 1;
686        persistent.grant_vote(self.node_id());
687
688        // Persist the new term and vote before responding
689        self.persist_state(persistent.current_term, persistent.voted_for);
690
691        // Transition to candidate
692        volatile.become_candidate();
693
694        // Initialize candidate state
695        *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
696
697        let term = persistent.current_term;
698        let log = self.log.read();
699        let last_log_index = log.last_index();
700        let last_log_term = log.last_term();
701
702        let _span =
703            tracing::info_span!("raft_election", node_id = self.node_id(), term = term).entered();
704
705        // Update metrics for the new election.
706        let metrics = crate::metrics::global();
707        metrics.set_current_term(term);
708        metrics.inc_elections_started();
709
710        info!(
711            node_id = self.node_id(),
712            candidate_term = term,
713            log_length = last_log_index,
714            "Started election"
715        );
716
717        // Send RequestVote to all peers
718        self.config
719            .peers
720            .iter()
721            .filter(|&&peer| peer != self.node_id())
722            .map(|&peer| {
723                RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
724            })
725            .collect()
726    }
727
728    /// Handle a vote response during election
729    pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
730        let should_become_leader = {
731            let mut persistent = self.persistent.write();
732            let mut volatile = self.volatile.write();
733
734            // Check if we're still a candidate
735            if !volatile.is_candidate() {
736                return false;
737            }
738
739            // Update term if necessary
740            if resp.term > persistent.current_term {
741                let from_term = persistent.current_term;
742                persistent.update_term(resp.term);
743                self.persist_state(persistent.current_term, persistent.voted_for);
744                volatile.become_follower(None);
745                *self.candidate_state.write() = None;
746                crate::metrics::global().set_current_term(persistent.current_term);
747                debug!(
748                    node_id = self.node_id(),
749                    from_term = from_term,
750                    to_term = persistent.current_term,
751                    "Stepped down to follower (higher term in vote response)"
752                );
753                return false;
754            }
755
756            // Ignore stale responses
757            if resp.term < persistent.current_term {
758                return false;
759            }
760
761            // Record vote if granted
762            if resp.vote_granted {
763                let mut candidate_state_guard = self.candidate_state.write();
764                if let Some(candidate_state) = candidate_state_guard.as_mut() {
765                    candidate_state.record_vote(from);
766
767                    info!(
768                        node_id = self.node_id(),
769                        from = from,
770                        votes = candidate_state.vote_count(),
771                        quorum = self.config.quorum_size(),
772                        "Received vote"
773                    );
774
775                    // Check if we have a quorum
776                    candidate_state.has_quorum(self.config.quorum_size())
777                } else {
778                    false
779                }
780            } else {
781                false
782            }
783        };
784
785        // Become leader outside of locks to prevent deadlock
786        if should_become_leader {
787            let votes = self
788                .candidate_state
789                .read()
790                .as_ref()
791                .map(|cs| cs.vote_count())
792                .unwrap_or(0);
793            info!(
794                node_id = self.node_id(),
795                term = self.current_term(),
796                votes_received = votes,
797                "Won election with quorum"
798            );
799            self.become_leader();
800            return true;
801        }
802
803        false
804    }
805
806    /// Transition to leader
807    fn become_leader(&self) {
808        let mut volatile = self.volatile.write();
809        volatile.become_leader();
810
811        let log = self.log.read();
812        let last_log_index = log.last_index();
813
814        // Initialize leader state
815        *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
816        *self.candidate_state.write() = None;
817
818        let persistent = self.persistent.read();
819        let term = persistent.current_term;
820
821        // Bump the packed fencing token to the new leader term (resets seq to 0).
822        self.fencing_token_state.bump_term_token(term as u32);
823
824        // Tell the failover coordinator we are now the leader.
825        let prev_leader = {
826            let mut fo = self.failover_coordinator.write();
827            let old = fo.leader_hint();
828            fo.set_leader(self.node_id());
829            old
830        };
831
832        // Update metrics: term and leader change counter.
833        let metrics = crate::metrics::global();
834        metrics.set_current_term(term);
835        metrics.inc_leader_changes();
836
837        // Emit an alerting event for the leader change.
838        if let Some(am) = &self.alert_manager {
839            am.emit(AlertEvent::LeaderChanged {
840                old_leader: prev_leader,
841                new_leader: self.node_id(),
842            });
843        }
844
845        info!(
846            node_id = self.node_id(),
847            term,
848            voted_for = ?persistent.voted_for,
849            peer_count = self.config.peers.len(),
850            "Became leader"
851        );
852    }
853
854    /// Issue a new fencing token for the current leader term.
855    ///
856    /// Returns `None` if the node is not the current leader.
857    pub fn issue_fencing_token(&self) -> Option<FencingToken> {
858        if !self.volatile.read().is_leader() {
859            return None;
860        }
861        Some(self.fencing_token_state.issue_token())
862    }
863
864    /// Validate that `token` is not stale relative to the current Raft term.
865    ///
866    /// Returns `Ok(())` if the token's term matches the current Raft term.
867    /// Returns `Err(RaftError::StaleTerm)` if the token predates the current term.
868    pub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()> {
869        let current_term = self.current_term();
870        if token.term() as u64 == current_term {
871            Ok(())
872        } else {
873            Err(RaftError::StaleTerm {
874                current: current_term,
875                received: token.term() as u64,
876            })
877        }
878    }
879
880    /// Create heartbeat messages for all peers
881    pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
882        let volatile = self.volatile.read();
883        if !volatile.is_leader() {
884            return Vec::new();
885        }
886        drop(volatile);
887
888        let term = self.current_term();
889        let log = self.log.read();
890        let leader_commit = log.commit_index();
891
892        self.config
893            .peers
894            .iter()
895            .filter(|&&peer| peer != self.node_id())
896            .map(|&peer| {
897                let prev_log_index = log.last_index();
898                let prev_log_term = log.last_term();
899
900                let req = AppendEntriesRequest::heartbeat(
901                    term,
902                    self.node_id(),
903                    prev_log_index,
904                    prev_log_term,
905                    leader_commit,
906                );
907
908                (peer, req)
909            })
910            .collect()
911    }
912
913    /// Create replication messages for all peers
914    pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
915        let volatile = self.volatile.read();
916        if !volatile.is_leader() {
917            return Vec::new();
918        }
919        drop(volatile);
920
921        let leader_state_guard = self.leader_state.read();
922        let leader_state = match leader_state_guard.as_ref() {
923            Some(state) => state,
924            None => return Vec::new(),
925        };
926
927        let term = self.current_term();
928        let log = self.log.read();
929        let leader_commit = log.commit_index();
930
931        self.config
932            .peers
933            .iter()
934            .filter(|&&peer| peer != self.node_id())
935            .filter_map(|&peer| {
936                let next_index = leader_state.get_next_index(peer);
937
938                if next_index > log.last_index() {
939                    return None;
940                }
941
942                let prev_log_index = next_index.saturating_sub(1);
943                let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
944
945                let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
946
947                if entries.is_empty() {
948                    return None;
949                }
950
951                let req = AppendEntriesRequest::new(
952                    term,
953                    self.node_id(),
954                    prev_log_index,
955                    prev_log_term,
956                    entries,
957                    leader_commit,
958                );
959
960                Some((peer, req))
961            })
962            .collect()
963    }
964
965    /// Replicate log entries to all followers that need them.
966    ///
967    /// This is a convenience method that combines `create_replication_requests()`
968    /// with the information the caller needs to actually send the RPCs.
969    /// Returns a list of `(peer_id, request)` pairs. If a follower is fully
970    /// caught up (its `next_index > last_log_index`), it is omitted -- use
971    /// `create_heartbeats()` for idle keep-alive messages.
972    ///
973    /// Typical usage in a replication loop:
974    /// ```rust,ignore
975    /// let requests = leader.replicate_to_followers();
976    /// for (peer, req) in requests {
977    ///     let resp = rpc_send(peer, req);
978    ///     leader.handle_replication_response(peer, resp)?;
979    /// }
980    /// ```
981    pub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
982        self.create_replication_requests()
983    }
984
985    /// Create an AppendEntries request for a specific follower.
986    ///
987    /// Returns `None` if this node is not the leader, if the follower is
988    /// already fully caught up, or if leader state is unavailable.
989    pub fn create_replication_request_for(&self, peer: NodeId) -> Option<AppendEntriesRequest> {
990        let volatile = self.volatile.read();
991        if !volatile.is_leader() {
992            return None;
993        }
994        drop(volatile);
995
996        let leader_state_guard = self.leader_state.read();
997        let leader_state = leader_state_guard.as_ref()?;
998
999        let term = self.current_term();
1000        let log = self.log.read();
1001        let leader_commit = log.commit_index();
1002
1003        let next_index = leader_state.get_next_index(peer);
1004
1005        if next_index > log.last_index() {
1006            // Peer is up-to-date; nothing to replicate
1007            return None;
1008        }
1009
1010        let prev_log_index = next_index.saturating_sub(1);
1011        let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
1012
1013        let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
1014
1015        if entries.is_empty() {
1016            return None;
1017        }
1018
1019        Some(AppendEntriesRequest::new(
1020            term,
1021            self.node_id(),
1022            prev_log_index,
1023            prev_log_term,
1024            entries,
1025            leader_commit,
1026        ))
1027    }
1028
1029    /// Handle a replication response
1030    pub fn handle_replication_response(
1031        &self,
1032        from: NodeId,
1033        resp: AppendEntriesResponse,
1034    ) -> RaftResult<()> {
1035        let mut persistent = self.persistent.write();
1036        let mut volatile = self.volatile.write();
1037
1038        // Check if we're still the leader
1039        if !volatile.is_leader() {
1040            return Ok(());
1041        }
1042
1043        // Update term if necessary
1044        if resp.term > persistent.current_term {
1045            let from_term = persistent.current_term;
1046            persistent.update_term(resp.term);
1047            self.persist_state(persistent.current_term, persistent.voted_for);
1048            volatile.become_follower(None);
1049            *self.leader_state.write() = None;
1050            self.snapshot_streamers.write().clear();
1051            crate::metrics::global().set_current_term(persistent.current_term);
1052            info!(
1053                node_id = self.node_id(),
1054                from_term = from_term,
1055                to_term = persistent.current_term,
1056                "Stepped down: leader to follower (higher term in replication response)"
1057            );
1058            return Ok(());
1059        }
1060
1061        drop(persistent);
1062        drop(volatile);
1063
1064        let mut leader_state_guard = self.leader_state.write();
1065        let leader_state = match leader_state_guard.as_mut() {
1066            Some(state) => state,
1067            None => return Ok(()),
1068        };
1069
1070        if resp.success {
1071            // Update match_index and next_index
1072            leader_state.update_success(from, resp.last_log_index);
1073
1074            debug!(
1075                node_id = self.node_id(),
1076                peer = from,
1077                match_index = resp.last_log_index,
1078                "Replication successful"
1079            );
1080
1081            // Emit SlowReplication alert if this follower is lagging.
1082            {
1083                const SLOW_REPLICATION_THRESHOLD: u64 = 100;
1084                let committed = self.log.read().commit_index();
1085                let follower_match = resp.last_log_index;
1086                if committed > follower_match {
1087                    let lag = committed - follower_match;
1088                    if lag > SLOW_REPLICATION_THRESHOLD {
1089                        if let Some(am) = &self.alert_manager {
1090                            am.emit(AlertEvent::SlowReplication {
1091                                follower: from,
1092                                lag_entries: lag,
1093                            });
1094                        }
1095                    }
1096                }
1097            }
1098
1099            // Try to advance commit index, using joint-consensus-aware
1100            // calculation when a membership change is in progress.
1101            let config_state = self.config_state.read().clone();
1102            let new_commit = leader_state.calculate_commit_index_joint(
1103                self.node_id(),
1104                self.log.read().last_index(),
1105                &config_state,
1106            );
1107
1108            let committed_advanced = {
1109                let mut log = self.log.write();
1110                if new_commit > log.commit_index() {
1111                    // Only commit entries from the current term (Raft safety)
1112                    if let Some(term) = log.get_term(new_commit) {
1113                        if term == self.current_term() {
1114                            let old_commit = log.commit_index();
1115                            log.set_commit_index(new_commit)?;
1116                            crate::metrics::global().set_commit_index(new_commit);
1117                            crate::metrics::global().set_log_entry_count(log.last_index());
1118                            info!(
1119                                node_id = self.node_id(),
1120                                old_commit_index = old_commit,
1121                                new_commit_index = new_commit,
1122                                "Advanced commit index"
1123                            );
1124                            true
1125                        } else {
1126                            false
1127                        }
1128                    } else {
1129                        false
1130                    }
1131                } else {
1132                    false
1133                }
1134            };
1135            if committed_advanced {
1136                if let Err(e) = self.apply_committed_entries() {
1137                    warn!(node_id = self.node_id(), error = ?e, "Failed to apply committed entries");
1138                }
1139            }
1140        } else {
1141            // Replication failed -- use fast backup with conflict hints
1142            // when available, otherwise simple decrement.
1143            if resp.conflict_index.is_some() || resp.conflict_term.is_some() {
1144                leader_state.update_failure_with_hint(
1145                    from,
1146                    resp.conflict_index,
1147                    resp.conflict_term,
1148                    resp.last_log_index,
1149                );
1150            } else {
1151                leader_state.update_failure(from);
1152            }
1153
1154            warn!(
1155                node_id = self.node_id(),
1156                peer = from,
1157                next_index = leader_state.get_next_index(from),
1158                conflict_index = ?resp.conflict_index,
1159                conflict_term = ?resp.conflict_term,
1160                "Replication failed, will retry with adjusted next_index"
1161            );
1162        }
1163
1164        Ok(())
1165    }
1166
1167    /// Attempt to create a snapshot if the log has grown past the threshold
1168    ///
1169    /// Call this after advancing the commit index. If a snapshot is created,
1170    /// the log is compacted up to the snapshot point.
1171    ///
1172    /// `state_machine_data` is the serialized state of the application state machine
1173    /// at the current applied index.
1174    pub fn maybe_create_snapshot(&self, state_machine_data: Vec<u8>) -> RaftResult<bool> {
1175        let mut snap_guard = self.snapshot_manager.write();
1176        let manager = match snap_guard.as_mut() {
1177            Some(m) => m,
1178            None => return Ok(false),
1179        };
1180
1181        let log = self.log.read();
1182        let entries_since = log.entries_since_snapshot();
1183
1184        if !manager.should_snapshot(entries_since) {
1185            return Ok(false);
1186        }
1187
1188        let applied_index = log.applied_index();
1189        if applied_index == 0 {
1190            return Ok(false);
1191        }
1192
1193        let applied_term = match log.get_term(applied_index) {
1194            Some(t) => t,
1195            None => {
1196                // The applied entry might already be compacted; use snapshot term
1197                let (snap_idx, snap_term) = log.get_snapshot_point();
1198                if applied_index == snap_idx {
1199                    snap_term
1200                } else {
1201                    return Err(RaftError::LogInconsistency {
1202                        reason: format!(
1203                            "Cannot determine term for applied index {}",
1204                            applied_index
1205                        ),
1206                    });
1207                }
1208            }
1209        };
1210
1211        drop(log);
1212
1213        manager.create_snapshot(state_machine_data, applied_index, applied_term)?;
1214
1215        // Compact the log
1216        let mut log = self.log.write();
1217        log.compact_until(applied_index, applied_term)?;
1218
1219        info!(
1220            node_id = self.node_id(),
1221            snapshot_index = applied_index,
1222            snapshot_term = applied_term,
1223            "Created snapshot and compacted log"
1224        );
1225
1226        Ok(true)
1227    }
1228
1229    /// Automatically create a snapshot if the log has grown past the configured threshold.
1230    ///
1231    /// Unlike `maybe_create_snapshot`, this method uses a `SnapshotPolicy` to decide
1232    /// whether to snapshot and takes a closure that produces state machine data on demand
1233    /// (avoiding the cost of serialization when no snapshot is needed).
1234    ///
1235    /// Call this after applying committed entries.
1236    pub fn auto_snapshot_if_needed<F>(
1237        &self,
1238        policy: &SnapshotPolicy,
1239        state_machine_data_fn: F,
1240    ) -> RaftResult<bool>
1241    where
1242        F: FnOnce() -> RaftResult<Vec<u8>>,
1243    {
1244        let log = self.log.read();
1245        let entries_since = log.entries_since_snapshot();
1246        let applied_index = log.applied_index();
1247
1248        if !policy.should_snapshot(entries_since, applied_index) {
1249            return Ok(false);
1250        }
1251
1252        if applied_index == 0 {
1253            return Ok(false);
1254        }
1255
1256        let applied_term = match log.get_term(applied_index) {
1257            Some(t) => t,
1258            None => {
1259                let (snap_idx, snap_term) = log.get_snapshot_point();
1260                if applied_index == snap_idx {
1261                    snap_term
1262                } else {
1263                    return Err(RaftError::LogInconsistency {
1264                        reason: format!(
1265                            "Cannot determine term for applied index {}",
1266                            applied_index
1267                        ),
1268                    });
1269                }
1270            }
1271        };
1272
1273        drop(log);
1274
1275        let data = state_machine_data_fn()?;
1276
1277        let mut snap_guard = self.snapshot_manager.write();
1278        let manager = match snap_guard.as_mut() {
1279            Some(m) => m,
1280            None => return Ok(false),
1281        };
1282
1283        manager.create_snapshot(data, applied_index, applied_term)?;
1284        drop(snap_guard);
1285
1286        let mut log = self.log.write();
1287        log.compact_until(applied_index, applied_term)?;
1288
1289        info!(
1290            node_id = self.node_id(),
1291            snapshot_index = applied_index,
1292            snapshot_term = applied_term,
1293            entries_compacted = entries_since,
1294            "Auto-snapshot triggered and log compacted"
1295        );
1296
1297        Ok(true)
1298    }
1299
1300    /// Handle an InstallSnapshot RPC from the leader
1301    ///
1302    /// Used when a follower is too far behind and the leader sends a snapshot
1303    /// instead of individual log entries.
1304    pub fn handle_install_snapshot(
1305        &self,
1306        req: InstallSnapshotRequest,
1307    ) -> RaftResult<InstallSnapshotResponse> {
1308        let mut persistent = self.persistent.write();
1309        let mut volatile = self.volatile.write();
1310
1311        debug!(
1312            node_id = self.node_id(),
1313            leader = req.leader_id,
1314            term = req.term,
1315            last_included_index = req.last_included_index,
1316            last_included_term = req.last_included_term,
1317            offset = req.offset,
1318            done = req.done,
1319            "Received InstallSnapshot"
1320        );
1321
1322        // Update term if necessary
1323        if req.term > persistent.current_term {
1324            let from_term = persistent.current_term;
1325            persistent.update_term(req.term);
1326            self.persist_state(persistent.current_term, persistent.voted_for);
1327            volatile.become_follower(Some(req.leader_id));
1328            *self.leader_state.write() = None;
1329            *self.candidate_state.write() = None;
1330            self.snapshot_streamers.write().clear();
1331            self.snapshot_stream_receivers.write().clear();
1332            debug!(
1333                node_id = self.node_id(),
1334                from_term = from_term,
1335                to_term = persistent.current_term,
1336                leader_id = req.leader_id,
1337                "Stepped down to follower (higher term in InstallSnapshot)"
1338            );
1339        }
1340
1341        // Reject if term is stale
1342        if req.term < persistent.current_term {
1343            warn!(
1344                node_id = self.node_id(),
1345                leader = req.leader_id,
1346                current_term = persistent.current_term,
1347                request_term = req.term,
1348                "Rejecting InstallSnapshot: stale term"
1349            );
1350            return Ok(InstallSnapshotResponse::new(persistent.current_term));
1351        }
1352
1353        // Update heartbeat and leader
1354        *self.last_heartbeat.write() = Instant::now();
1355        volatile.become_follower(Some(req.leader_id));
1356        *self.candidate_state.write() = None;
1357
1358        let current_term = persistent.current_term;
1359        drop(persistent);
1360        drop(volatile);
1361
1362        // Handle chunked snapshot transfer using disk-streaming receiver to avoid RAM buffering.
1363        //
1364        // We use snapshot_dir for temp files; if unavailable, fall back to the in-memory receiver.
1365        let snapshot_dir = self.config.snapshot_dir.clone();
1366
1367        if let Some(ref dir) = snapshot_dir {
1368            // Disk-streaming path: chunks go directly to a temp file.
1369            let mut receivers = self.snapshot_stream_receivers.write();
1370
1371            // Create or retrieve the receiver for this sender.
1372            let receiver = match receivers.entry(req.leader_id) {
1373                std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
1374                std::collections::hash_map::Entry::Vacant(v) => {
1375                    let r = SnapshotStreamReceiver::new(
1376                        dir,
1377                        req.last_included_index,
1378                        req.last_included_term,
1379                    )?;
1380                    v.insert(r)
1381                }
1382            };
1383
1384            let result = receiver.receive_chunk(&req)?;
1385
1386            if let Some(final_path) = result {
1387                // Remove the completed receiver before acquiring other locks.
1388                receivers.remove(&req.leader_id);
1389                drop(receivers);
1390
1391                // Load the snapshot from the written file and install it.
1392                let snapshot_data = std::fs::read(&final_path).map_err(|e| {
1393                    crate::error::RaftError::StorageError {
1394                        message: format!(
1395                            "Failed to read received snapshot file '{}': {}",
1396                            final_path.display(),
1397                            e
1398                        ),
1399                    }
1400                })?;
1401
1402                let snapshot = Snapshot::new(
1403                    req.last_included_index,
1404                    req.last_included_term,
1405                    snapshot_data,
1406                );
1407
1408                let mut snap_guard = self.snapshot_manager.write();
1409                if let Some(manager) = snap_guard.as_mut() {
1410                    manager.install_snapshot(snapshot)?;
1411                }
1412                drop(snap_guard);
1413
1414                // Reset the log to match the snapshot
1415                let mut log = self.log.write();
1416                log.install_snapshot(req.last_included_index, req.last_included_term);
1417
1418                info!(
1419                    node_id = self.node_id(),
1420                    last_included_index = req.last_included_index,
1421                    last_included_term = req.last_included_term,
1422                    "Installed snapshot from leader (streaming path)"
1423                );
1424            }
1425        } else {
1426            // In-memory fallback: use the existing SnapshotReceiver.
1427            let mut receiver_guard = self.snapshot_receiver.write();
1428
1429            if req.offset == 0 {
1430                *receiver_guard = Some(SnapshotReceiver::new(
1431                    req.last_included_index,
1432                    req.last_included_term,
1433                ));
1434            }
1435
1436            let receiver = match receiver_guard.as_mut() {
1437                Some(r) => r,
1438                None => {
1439                    warn!(
1440                        node_id = self.node_id(),
1441                        offset = req.offset,
1442                        "Received non-initial snapshot chunk without active receiver"
1443                    );
1444                    return Ok(InstallSnapshotResponse::new(current_term));
1445                }
1446            };
1447
1448            let completed = receiver.receive_chunk(&req)?;
1449
1450            if let Some(snapshot) = completed {
1451                *receiver_guard = None;
1452                drop(receiver_guard);
1453
1454                let mut snap_guard = self.snapshot_manager.write();
1455                if let Some(manager) = snap_guard.as_mut() {
1456                    manager.install_snapshot(snapshot)?;
1457                }
1458
1459                let mut log = self.log.write();
1460                log.install_snapshot(req.last_included_index, req.last_included_term);
1461
1462                info!(
1463                    node_id = self.node_id(),
1464                    last_included_index = req.last_included_index,
1465                    last_included_term = req.last_included_term,
1466                    "Installed snapshot from leader (in-memory path)"
1467                );
1468            }
1469        }
1470
1471        Ok(InstallSnapshotResponse::new(current_term))
1472    }
1473
1474    /// Prepare an InstallSnapshot request for a follower that is too far behind.
1475    ///
1476    /// For snapshots at or below `snapshot_chunk_threshold_bytes` the full data
1477    /// is loaded into memory and sent as a single `done=true` request.  For
1478    /// larger snapshots a [`crate::snapshot::SnapshotStreamer`] is created (or
1479    /// resumed) and one chunk is emitted per call; the streamer is automatically
1480    /// cleaned up once the final chunk is delivered.
1481    ///
1482    /// Returns `Ok(None)` when this node is not the leader, when no snapshot
1483    /// exists, or when the target peer has already caught up past the snapshot
1484    /// point via normal log replication.
1485    pub fn prepare_install_snapshot(
1486        &self,
1487        target_peer: NodeId,
1488    ) -> RaftResult<Option<InstallSnapshotRequest>> {
1489        // Must be leader to send snapshots.
1490        if !self.volatile.read().is_leader() {
1491            return Ok(None);
1492        }
1493
1494        // Determine whether the peer still needs a snapshot and which snapshot to use.
1495        // We extract all needed values while holding the read locks, then drop them
1496        // before acquiring any write locks (no lock inversion).
1497        let snap_meta = {
1498            let snap_guard = self.snapshot_manager.read();
1499            match snap_guard.as_ref() {
1500                Some(m) => match m.get_latest_metadata() {
1501                    Some(meta) => meta.clone(),
1502                    None => return Ok(None),
1503                },
1504                None => return Ok(None),
1505            }
1506        };
1507
1508        // Check if the peer actually needs a snapshot.
1509        let next_index = {
1510            let leader_guard = self.leader_state.read();
1511            match leader_guard.as_ref() {
1512                Some(ls) => ls.get_next_index(target_peer),
1513                // Not leader anymore — bail out.
1514                None => return Ok(None),
1515            }
1516        };
1517
1518        if next_index > snap_meta.last_included_index {
1519            // Peer has caught up — clean up any in-progress streamer and return.
1520            self.snapshot_streamers.write().remove(&target_peer);
1521            return Ok(None);
1522        }
1523
1524        let term = self.current_term();
1525        let threshold = self.config.snapshot_chunk_threshold_bytes;
1526
1527        if snap_meta.size_bytes <= threshold {
1528            // Small snapshot: single-shot (original behaviour, no streamer required).
1529            let snapshot = {
1530                let snap_guard = self.snapshot_manager.read();
1531                match snap_guard.as_ref() {
1532                    Some(m) => m.load_latest()?.ok_or_else(|| RaftError::StorageError {
1533                        message: "snapshot disappeared between metadata fetch and load".into(),
1534                    })?,
1535                    None => return Ok(None),
1536                }
1537            };
1538            return Ok(Some(InstallSnapshotRequest::new_complete(
1539                term,
1540                self.node_id(),
1541                snap_meta.last_included_index,
1542                snap_meta.last_included_term,
1543                snapshot.data,
1544            )));
1545        }
1546
1547        // Large snapshot: chunked streaming.
1548        let chunk_size = self.config.snapshot_chunk_size_bytes;
1549
1550        // Obtain the on-disk path before acquiring the streamer write-lock so
1551        // we never hold both the snapshot_manager lock and the streamer lock
1552        // at the same time.
1553        let data_path_for_new = {
1554            let snap_guard = self.snapshot_manager.read();
1555            match snap_guard.as_ref() {
1556                Some(m) => m
1557                    .latest_data_path()
1558                    .ok_or_else(|| RaftError::StorageError {
1559                        message: "snapshot data path not found for chunked transfer".into(),
1560                    })?,
1561                None => return Ok(None),
1562            }
1563        };
1564
1565        let mut streamers = self.snapshot_streamers.write();
1566        let streamer = match streamers.entry(target_peer) {
1567            std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
1568            std::collections::hash_map::Entry::Vacant(v) => {
1569                v.insert(crate::snapshot::SnapshotStreamer::new(
1570                    data_path_for_new,
1571                    snap_meta.clone(),
1572                    chunk_size,
1573                )?)
1574            }
1575        };
1576
1577        let req = streamer.next_chunk_for_rpc(term, self.node_id())?;
1578
1579        // If the final chunk was just emitted, remove the streamer.
1580        let is_done = req.as_ref().is_some_and(|r| r.done);
1581        drop(streamers);
1582        if is_done {
1583            self.snapshot_streamers.write().remove(&target_peer);
1584        }
1585
1586        Ok(req)
1587    }
1588
1589    /// Check if a follower needs a snapshot instead of normal log replication
1590    ///
1591    /// Returns true if the follower's next_index is at or before the snapshot point.
1592    pub fn follower_needs_snapshot(&self, peer: NodeId) -> bool {
1593        let leader_state_guard = self.leader_state.read();
1594        let leader_state = match leader_state_guard.as_ref() {
1595            Some(s) => s,
1596            None => return false,
1597        };
1598
1599        let next_index = leader_state.get_next_index(peer);
1600        let log = self.log.read();
1601        let (snap_idx, _) = log.get_snapshot_point();
1602
1603        snap_idx > 0 && next_index <= snap_idx
1604    }
1605
1606    // ── Membership change (joint consensus) ──────────────────────────
1607
1608    /// Add a node to the cluster via joint consensus.
1609    ///
1610    /// If this node is the leader the change is proposed immediately.
1611    /// Returns an error if a membership change is already in progress or
1612    /// the node is already a member.
1613    pub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()> {
1614        self.propose_membership_change(MembershipChange::AddNode { node_id, address })
1615    }
1616
1617    /// Remove a node from the cluster via joint consensus.
1618    ///
1619    /// If the removed node is this node, it will step down after the
1620    /// configuration change commits.
1621    pub fn remove_node(&self, node_id: NodeId) -> RaftResult<()> {
1622        self.propose_membership_change(MembershipChange::RemoveNode { node_id })
1623    }
1624
1625    /// Get the current list of cluster members as `(node_id, address)` pairs.
1626    pub fn cluster_members(&self) -> Vec<(NodeId, String)> {
1627        self.config_state.read().all_members()
1628    }
1629
1630    /// Check whether the cluster is currently in joint consensus.
1631    pub fn is_in_joint_consensus(&self) -> bool {
1632        self.config_state.read().is_joint()
1633    }
1634
1635    /// Get the current membership configuration version.
1636    pub fn membership_version(&self) -> u64 {
1637        self.config_state.read().version()
1638    }
1639
1640    /// Propose a membership change (leader only).
1641    ///
1642    /// Implements the simplified Raft joint consensus protocol (Section 6):
1643    /// 1. Leader creates joint config C_{old,new} and replicates it.
1644    /// 2. Once C_{old,new} is committed the leader creates C_{new}.
1645    /// 3. During the joint state, decisions require a majority of **both**
1646    ///    the old and new configurations.
1647    pub fn propose_membership_change(&self, change: MembershipChange) -> RaftResult<()> {
1648        // Must be leader
1649        let volatile = self.volatile.read();
1650        if !volatile.is_leader() {
1651            return Err(RaftError::NotLeader {
1652                leader_id: volatile.leader_id,
1653            });
1654        }
1655        drop(volatile);
1656
1657        let mut cs = self.config_state.write();
1658
1659        // Only one membership change at a time
1660        if cs.is_joint() {
1661            return Err(RaftError::MembershipChangeInProgress);
1662        }
1663
1664        let current = match &*cs {
1665            ConfigState::Stable(c) => c.clone(),
1666            ConfigState::Joint { .. } => return Err(RaftError::MembershipChangeInProgress),
1667        };
1668
1669        // Build the new config
1670        let new_config = match &change {
1671            MembershipChange::AddNode { node_id, address } => {
1672                if current.contains(*node_id) {
1673                    return Err(RaftError::NodeAlreadyMember { node_id: *node_id });
1674                }
1675                current.with_added_member(*node_id, address.clone())
1676            }
1677            MembershipChange::RemoveNode { node_id } => {
1678                if !current.contains(*node_id) {
1679                    return Err(RaftError::NodeNotMember { node_id: *node_id });
1680                }
1681                current.without_member(*node_id)
1682            }
1683        };
1684
1685        // Enter joint consensus C_{old,new}
1686        *cs = ConfigState::Joint {
1687            old: current.clone(),
1688            new: new_config.clone(),
1689        };
1690
1691        info!(
1692            node_id = self.node_id(),
1693            change = ?change,
1694            old_version = current.version(),
1695            new_version = new_config.version(),
1696            "Entered joint consensus"
1697        );
1698
1699        // Update the peers list and leader state to include the union of
1700        // both configs so replication / heartbeats reach the new member.
1701        self.update_leader_state_for_config(&cs);
1702
1703        // Append a joint-config marker to the log so followers replicate it.
1704        let term = self.current_term();
1705        let mut log = self.log.write();
1706        let _index = log.append(term, Command::from_str("__membership_joint__"));
1707
1708        Ok(())
1709    }
1710
1711    /// Commit the joint consensus transition: move from C_{old,new} to C_{new}.
1712    ///
1713    /// Call this once the joint config entry has been committed (i.e. acknowledged
1714    /// by a quorum of **both** old and new configs).
1715    pub fn commit_membership_change(&self) -> RaftResult<()> {
1716        let mut cs = self.config_state.write();
1717        let new_config = match &*cs {
1718            ConfigState::Joint { new, .. } => new.clone(),
1719            ConfigState::Stable(_) => {
1720                // Nothing to do -- already stable
1721                return Ok(());
1722            }
1723        };
1724
1725        *cs = ConfigState::Stable(new_config.clone());
1726
1727        info!(
1728            node_id = self.node_id(),
1729            version = new_config.version(),
1730            members = ?new_config.member_ids(),
1731            "Committed membership change, now stable"
1732        );
1733
1734        // Update leader state to reflect the final config
1735        self.update_leader_state_for_config(&cs);
1736
1737        // Append a stable-config marker so followers learn about it
1738        let term = self.current_term();
1739        let mut log = self.log.write();
1740        let _index = log.append(term, Command::from_str("__membership_stable__"));
1741
1742        // If we (the leader) were removed, step down
1743        if !new_config.contains(self.node_id()) {
1744            drop(cs);
1745            drop(log);
1746            self.step_down();
1747        }
1748
1749        Ok(())
1750    }
1751
1752    /// Calculate whether a set of nodes forms a quorum under the current
1753    /// membership config (handles both stable and joint states).
1754    pub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool {
1755        self.config_state.read().has_quorum(responding_nodes)
1756    }
1757
1758    /// Check if this node is stepping down (has been removed from the cluster).
1759    pub fn is_stepping_down(&self) -> bool {
1760        *self.stepping_down.read()
1761    }
1762
1763    /// Attach an optional shard placement scheduler.
1764    ///
1765    /// The scheduler is started in the background via `tokio::spawn` and its
1766    /// handle is stored so that `step_down` can stop it
1767    /// automatically when this node loses leadership.  Subsequent calls replace
1768    /// the previous handle, stopping the old scheduler first.
1769    pub fn attach_placement_scheduler(
1770        self: &Arc<Self>,
1771        scheduler: crate::placement_scheduler::PlacementScheduler,
1772    ) {
1773        let handle = scheduler.handle();
1774        tokio::spawn(scheduler.run());
1775        let mut guard = self.placement_scheduler_handle.write();
1776        if let Some(old) = guard.take() {
1777            old.stop();
1778        }
1779        *guard = Some(handle);
1780    }
1781
1782    /// Gracefully step down: revert to follower and mark as stepping down.
1783    fn step_down(&self) {
1784        let mut volatile = self.volatile.write();
1785        volatile.become_follower(None);
1786        *self.leader_state.write() = None;
1787        *self.candidate_state.write() = None;
1788        *self.stepping_down.write() = true;
1789        self.snapshot_streamers.write().clear();
1790        self.snapshot_stream_receivers.write().clear();
1791        if let Some(handle) = self.placement_scheduler_handle.write().take() {
1792            handle.stop();
1793        }
1794
1795        info!(
1796            node_id = self.node_id(),
1797            "Stepping down -- removed from cluster"
1798        );
1799    }
1800
1801    /// Synchronize leader replication state with the current config so
1802    /// new members receive heartbeats / log entries.
1803    fn update_leader_state_for_config(&self, cs: &ConfigState) {
1804        let mut ls_guard = self.leader_state.write();
1805        let ls = match ls_guard.as_mut() {
1806            Some(s) => s,
1807            None => return,
1808        };
1809
1810        let all_ids = cs.all_member_ids();
1811        let log = self.log.read();
1812        let last_log_index = log.last_index();
1813
1814        // Add entries for any new peers that are not yet tracked
1815        for &id in &all_ids {
1816            if id == self.node_id() {
1817                continue;
1818            }
1819            ls.next_index.entry(id).or_insert(last_log_index + 1);
1820            ls.match_index.entry(id).or_insert(0);
1821        }
1822
1823        // Remove entries for peers that are no longer in any config
1824        ls.next_index
1825            .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1826        ls.match_index
1827            .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1828    }
1829
1830    /// Check if election timeout has elapsed.
1831    ///
1832    /// The effective timeout is computed from whichever is **larger**:
1833    ///
1834    /// - The static `RaftConfig::random_election_timeout()` from the initial
1835    ///   node configuration, or
1836    /// - `2 × dynamic_config.heartbeat_interval_ms`, the Raft-safe lower bound
1837    ///   derived from the hot-reloadable heartbeat interval.
1838    ///
1839    /// This means that updating `DynamicConfig::heartbeat_interval_ms` at
1840    /// runtime also extends (or contracts) the election timeout proportionally,
1841    /// so the invariant `election_timeout ≥ 2 × heartbeat_interval` is always
1842    /// maintained even after a hot-reload.
1843    pub fn election_timeout_elapsed(&self) -> bool {
1844        let last_heartbeat = *self.last_heartbeat.read();
1845        let static_timeout = self.config.random_election_timeout();
1846        // Dynamic lower bound: 2 × current heartbeat interval.
1847        let dynamic_min_ms = self
1848            .dynamic_config
1849            .read()
1850            .heartbeat_interval_ms
1851            .saturating_mul(2);
1852        let effective_timeout = static_timeout.max(Duration::from_millis(dynamic_min_ms));
1853        last_heartbeat.elapsed() >= effective_timeout
1854    }
1855
1856    /// Reset election timer
1857    pub fn reset_election_timer(&self) {
1858        *self.last_heartbeat.write() = Instant::now();
1859    }
1860
1861    /// Get a hint about the current leader (for client redirection)
1862    pub fn get_leader_hint(&self) -> Option<NodeId> {
1863        self.volatile.read().leader_id
1864    }
1865
1866    /// Return a clone of the current dynamic configuration.
1867    pub fn get_dynamic_config(&self) -> DynamicConfig {
1868        self.dynamic_config.read().clone()
1869    }
1870
1871    /// Hot-reload the dynamic configuration fields.
1872    ///
1873    /// This is the primary mechanism for applying operator changes (e.g. via
1874    /// `SIGHUP` or an admin RPC) without restarting the node.  Only the fields
1875    /// in [`DynamicConfig`] are updated; fields that require a full restart
1876    /// (bind_addr, node_id, peers, …) cannot be changed here.
1877    ///
1878    /// The new values take effect on the **next Raft event-loop tick** —
1879    /// typically within one `heartbeat_interval_ms`.
1880    pub fn update_dynamic_config(&self, new_config: DynamicConfig) {
1881        *self.dynamic_config.write() = new_config;
1882        debug!(
1883            node_id = self.node_id(),
1884            heartbeat_interval_ms = self.dynamic_config.read().heartbeat_interval_ms,
1885            compaction_threshold = self.dynamic_config.read().compaction_threshold,
1886            "Dynamic configuration updated"
1887        );
1888    }
1889
1890    // ── State machine application ──────────────────────────────────
1891
1892    /// Register a [`StateMachine`] to receive committed log entries.
1893    ///
1894    /// After this call every committed entry that has not yet been applied will
1895    /// be fed to the state machine the next time the commit index advances.
1896    /// Only one state machine can be active at a time; calling this method
1897    /// again replaces the previous one.
1898    pub fn set_state_machine(&self, sm: impl StateMachine + 'static) -> RaftResult<()> {
1899        let mut guard = self.state_machine.lock();
1900        *guard = Some(Box::new(sm));
1901        Ok(())
1902    }
1903
1904    /// Apply all committed-but-not-yet-applied log entries to the registered
1905    /// state machine (if any).
1906    ///
1907    /// The method acquires the log write-lock briefly for each batch and updates
1908    /// `applied_index` on success.  Errors from the state machine are propagated
1909    /// to the caller; the `applied_index` advances only for successfully applied
1910    /// entries.
1911    fn apply_committed_entries(&self) -> RaftResult<()> {
1912        // Collect entries to apply while holding the log read-lock, then
1913        // release it before calling the (potentially slow) state machine.
1914        let entries_to_apply = {
1915            let log = self.log.read();
1916            if log.applied_index() >= log.commit_index() {
1917                return Ok(());
1918            }
1919            log.get_uncommitted_entries()
1920        };
1921
1922        if entries_to_apply.is_empty() {
1923            return Ok(());
1924        }
1925
1926        let mut sm_guard = self.state_machine.lock();
1927        let sm = match sm_guard.as_mut() {
1928            Some(s) => s,
1929            None => {
1930                // No state machine registered — just advance applied_index.
1931                let mut log = self.log.write();
1932                if let Some(last) = entries_to_apply.last() {
1933                    log.set_applied_index(last.index)?;
1934                }
1935                return Ok(());
1936            }
1937        };
1938
1939        for entry in &entries_to_apply {
1940            sm.apply(entry).map_err(|e| {
1941                warn!(
1942                    node_id = self.node_id(),
1943                    index = entry.index,
1944                    error = ?e,
1945                    "State machine failed to apply entry"
1946                );
1947                e
1948            })?;
1949            // Advance applied_index one step at a time so that on error the
1950            // log reflects the last successfully applied entry.
1951            self.log.write().set_applied_index(entry.index)?;
1952        }
1953
1954        Ok(())
1955    }
1956
1957    /// Attach an [`AlertManager`] to this node.
1958    ///
1959    /// After this call the node will emit [`AlertEvent`]s to the manager on
1960    /// significant state changes (leader election, slow replication, etc.).
1961    /// Only one manager can be active at a time; calling this again replaces
1962    /// the previous one.
1963    pub fn set_alert_manager(&mut self, am: Arc<AlertManager>) {
1964        self.alert_manager = Some(am);
1965    }
1966
1967    /// Trigger a failover election if this node is a follower.
1968    /// Returns the vote requests to send to peers, or an empty vec
1969    /// if the node is not in follower state.
1970    pub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest> {
1971        let state = self.volatile.read().node_state;
1972        if state != NodeState::Follower {
1973            return Vec::new();
1974        }
1975        self.start_election()
1976    }
1977}
1978
1979// ---------------------------------------------------------------------------
1980// WAL replay helper (extracted to node_wal_replay.rs to keep this file ≤ 2 000 lines)
1981// ---------------------------------------------------------------------------
1982
1983#[path = "node_wal_replay.rs"]
1984mod wal_replay;
1985use wal_replay::replay_wal_into_log;
1986
1987#[cfg(test)]
1988#[path = "node_tests.rs"]
1989mod tests;
1990
1991#[cfg(test)]
1992#[path = "node_tests_advanced.rs"]
1993mod tests_advanced;
1994
1995#[cfg(test)]
1996#[path = "integration_tests.rs"]
1997mod integration_tests;
1998#[cfg(test)]
1999#[path = "node_snapshot_tests.rs"]
2000mod snapshot_tests;