1use crate::config::DynamicConfig;
4use crate::error::{RaftError, RaftResult};
5use crate::failover::{AlertEvent, AlertManager, FailoverConfig, FailoverCoordinator};
6use crate::log::{Command, LogEntry, RaftLog, StateMachine};
7use crate::persistence::{FilePersistence, RaftPersistence};
8use crate::rpc::{
9 AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
10};
11use crate::snapshot::{
12 InstallSnapshotRequest, InstallSnapshotResponse, Snapshot, SnapshotConfig, SnapshotManager,
13 SnapshotPolicy, SnapshotReceiver, SnapshotStreamReceiver,
14};
15use crate::state::FencingTokenState;
16use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
17use crate::types::{
18 ClusterConfig, ConfigState, FencingToken, HeartbeatConfig, LogIndex, MembershipChange, NodeId,
19 NodeState, RaftConfig, Term,
20};
21use crate::wal::{CorruptionPolicy, WalReader};
22use parking_lot::RwLock;
23use std::collections::HashSet;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29pub struct RaftNode {
31 config: Arc<RaftConfig>,
33 persistent: Arc<RwLock<PersistentState>>,
35 volatile: Arc<RwLock<VolatileState>>,
37 log: Arc<RwLock<RaftLog>>,
39 leader_state: Arc<RwLock<Option<LeaderState>>>,
41 candidate_state: Arc<RwLock<Option<CandidateState>>>,
43 last_heartbeat: Arc<RwLock<Instant>>,
45 snapshot_manager: Arc<RwLock<Option<SnapshotManager>>>,
47 snapshot_receiver: Arc<RwLock<Option<SnapshotReceiver>>>,
49 persistence: Option<Arc<dyn RaftPersistence>>,
51 config_state: Arc<RwLock<ConfigState>>,
53 stepping_down: Arc<RwLock<bool>>,
55 fencing_token_state: Arc<FencingTokenState>,
57 is_recovering: Arc<AtomicBool>,
59 snapshot_streamers:
65 Arc<RwLock<std::collections::HashMap<NodeId, crate::snapshot::SnapshotStreamer>>>,
66 snapshot_stream_receivers:
71 Arc<RwLock<std::collections::HashMap<NodeId, SnapshotStreamReceiver>>>,
72 pub dynamic_config: Arc<RwLock<DynamicConfig>>,
82 pub failover_coordinator: Arc<RwLock<FailoverCoordinator>>,
88 placement_scheduler_handle:
93 Arc<RwLock<Option<crate::placement_scheduler::PlacementSchedulerHandle>>>,
94 state_machine: Arc<parking_lot::Mutex<Option<Box<dyn StateMachine>>>>,
99 alert_manager: Option<Arc<AlertManager>>,
104}
105
106impl RaftNode {
107 pub fn new(config: RaftConfig) -> RaftResult<Self> {
109 config
111 .validate()
112 .map_err(|msg| RaftError::ConfigError { message: msg })?;
113
114 let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
116 let snap_config =
117 SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
118 Some(SnapshotManager::new(snap_config)?)
119 } else {
120 None
121 };
122
123 let persistence: Option<Arc<dyn RaftPersistence>> =
125 if let Some(ref dir) = config.persistence_dir {
126 Some(Arc::new(FilePersistence::new(dir, config.sync_on_write)?))
127 } else {
128 None
129 };
130
131 let (persistent_state, mut raft_log) = if let Some(ref p) = persistence {
133 let (term, voted_for) = p.load_state()?;
134 let mut ps = PersistentState::new();
135 ps.current_term = term;
136 ps.voted_for = voted_for;
137
138 let entries = p.load_log()?;
139 let mut log = RaftLog::new();
140 if !entries.is_empty() {
141 log.append_entries(entries)?;
142 }
143
144 let applied_idx = p.load_applied_index()?;
146 if applied_idx > 0 && applied_idx <= log.last_index() {
147 if let Err(e) = log.set_commit_index(applied_idx) {
149 warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
150 } else if let Err(e) = log.set_applied_index(applied_idx) {
151 warn!(applied_idx, error = ?e, "Failed to restore applied index");
152 }
153 }
154
155 info!(
156 node_id = config.node_id,
157 term = term,
158 voted_for = ?voted_for,
159 last_log_index = log.last_index(),
160 "Recovered state from persistence"
161 );
162
163 (ps, log)
164 } else {
165 (PersistentState::new(), RaftLog::new())
166 };
167
168 let is_recovering = Arc::new(AtomicBool::new(false));
172 if let Some(ref wal_dir) = config.wal_dir {
173 is_recovering.store(true, Ordering::Release);
174 let result = replay_wal_into_log(wal_dir, &mut raft_log);
175 is_recovering.store(false, Ordering::Release);
176 result?;
177 }
178
179 let initial_members: Vec<(NodeId, String)> =
181 config.peers.iter().map(|&id| (id, String::new())).collect();
182 let config_state = ConfigState::new_stable(initial_members);
183
184 let hb_interval_ms = config.heartbeat_interval;
186 let failover_coordinator = {
187 let hb_cfg = HeartbeatConfig::new(hb_interval_ms, hb_interval_ms * 10, 3);
188 let mut coord =
189 FailoverCoordinator::new(hb_cfg, FailoverConfig::default(), config.node_id);
190 for &peer in config.peers.iter().filter(|&&p| p != config.node_id) {
191 let _ = coord.track_peer(peer);
193 }
194 Arc::new(RwLock::new(coord))
195 };
196
197 Ok(Self {
198 config: Arc::new(config),
199 persistent: Arc::new(RwLock::new(persistent_state)),
200 volatile: Arc::new(RwLock::new(VolatileState::new())),
201 log: Arc::new(RwLock::new(raft_log)),
202 leader_state: Arc::new(RwLock::new(None)),
203 candidate_state: Arc::new(RwLock::new(None)),
204 last_heartbeat: Arc::new(RwLock::new(Instant::now())),
205 snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
206 snapshot_receiver: Arc::new(RwLock::new(None)),
207 snapshot_streamers: Arc::new(RwLock::new(std::collections::HashMap::new())),
208 snapshot_stream_receivers: Arc::new(RwLock::new(std::collections::HashMap::new())),
209 persistence,
210 config_state: Arc::new(RwLock::new(config_state)),
211 stepping_down: Arc::new(RwLock::new(false)),
212 fencing_token_state: Arc::new(FencingTokenState::new()),
213 is_recovering,
214 dynamic_config: Arc::new(RwLock::new(DynamicConfig {
215 heartbeat_interval_ms: hb_interval_ms,
216 compaction_threshold: 10_000,
217 })),
218 failover_coordinator,
219 placement_scheduler_handle: Arc::new(RwLock::new(None)),
220 state_machine: Arc::new(parking_lot::Mutex::new(None)),
221 alert_manager: None,
222 })
223 }
224
225 pub fn with_persistence(
230 config: RaftConfig,
231 persistence: Arc<dyn RaftPersistence>,
232 ) -> RaftResult<Self> {
233 config
234 .validate()
235 .map_err(|msg| RaftError::ConfigError { message: msg })?;
236
237 let snapshot_manager = if let Some(ref dir) = config.snapshot_dir {
238 let snap_config =
239 SnapshotConfig::new(dir.clone(), config.max_snapshots, config.snapshot_threshold);
240 Some(SnapshotManager::new(snap_config)?)
241 } else {
242 None
243 };
244
245 let (term, voted_for) = persistence.load_state()?;
246 let mut ps = PersistentState::new();
247 ps.current_term = term;
248 ps.voted_for = voted_for;
249
250 let entries = persistence.load_log()?;
251 let mut raft_log = RaftLog::new();
252 if !entries.is_empty() {
253 raft_log.append_entries(entries)?;
254 }
255
256 let applied_idx = persistence.load_applied_index()?;
258 if applied_idx > 0 && applied_idx <= raft_log.last_index() {
259 if let Err(e) = raft_log.set_commit_index(applied_idx) {
260 warn!(applied_idx, error = ?e, "Failed to restore commit index from applied_index");
261 } else if let Err(e) = raft_log.set_applied_index(applied_idx) {
262 warn!(applied_idx, error = ?e, "Failed to restore applied index");
263 }
264 }
265
266 info!(
267 node_id = config.node_id,
268 term = term,
269 voted_for = ?voted_for,
270 last_log_index = raft_log.last_index(),
271 "Recovered state via explicit persistence"
272 );
273
274 let is_recovering = Arc::new(AtomicBool::new(false));
276 if let Some(ref wal_dir) = config.wal_dir {
277 is_recovering.store(true, Ordering::Release);
278 let result = replay_wal_into_log(wal_dir, &mut raft_log);
279 is_recovering.store(false, Ordering::Release);
280 result?;
281 }
282
283 let initial_members: Vec<(NodeId, String)> =
284 config.peers.iter().map(|&id| (id, String::new())).collect();
285 let config_state = ConfigState::new_stable(initial_members);
286
287 let hb_interval_ms_wp = config.heartbeat_interval;
289 let failover_coordinator_wp = {
290 let hb_cfg = HeartbeatConfig::new(hb_interval_ms_wp, hb_interval_ms_wp * 10, 3);
291 let mut coord =
292 FailoverCoordinator::new(hb_cfg, FailoverConfig::default(), config.node_id);
293 for &peer in config.peers.iter().filter(|&&p| p != config.node_id) {
294 let _ = coord.track_peer(peer);
295 }
296 Arc::new(RwLock::new(coord))
297 };
298
299 Ok(Self {
300 config: Arc::new(config),
301 persistent: Arc::new(RwLock::new(ps)),
302 volatile: Arc::new(RwLock::new(VolatileState::new())),
303 log: Arc::new(RwLock::new(raft_log)),
304 leader_state: Arc::new(RwLock::new(None)),
305 candidate_state: Arc::new(RwLock::new(None)),
306 last_heartbeat: Arc::new(RwLock::new(Instant::now())),
307 snapshot_manager: Arc::new(RwLock::new(snapshot_manager)),
308 snapshot_receiver: Arc::new(RwLock::new(None)),
309 snapshot_streamers: Arc::new(RwLock::new(std::collections::HashMap::new())),
310 snapshot_stream_receivers: Arc::new(RwLock::new(std::collections::HashMap::new())),
311 persistence: Some(persistence),
312 config_state: Arc::new(RwLock::new(config_state)),
313 stepping_down: Arc::new(RwLock::new(false)),
314 fencing_token_state: Arc::new(FencingTokenState::new()),
315 is_recovering,
316 dynamic_config: Arc::new(RwLock::new(DynamicConfig {
317 heartbeat_interval_ms: hb_interval_ms_wp,
318 compaction_threshold: 10_000,
319 })),
320 failover_coordinator: failover_coordinator_wp,
321 placement_scheduler_handle: Arc::new(RwLock::new(None)),
322 state_machine: Arc::new(parking_lot::Mutex::new(None)),
323 alert_manager: None,
324 })
325 }
326
327 fn persist_state(&self, term: Term, voted_for: Option<NodeId>) {
330 if let Some(ref p) = self.persistence {
331 if let Err(e) = p.save_state(term, voted_for) {
332 warn!(node_id = self.node_id(), error = ?e, "Failed to persist state");
333 }
334 }
335 }
336
337 fn persist_log_entries(&self, entries: &[LogEntry]) {
339 if let Some(ref p) = self.persistence {
340 if let Err(e) = p.append_entries(entries) {
341 warn!(node_id = self.node_id(), error = ?e, "Failed to persist log entries");
342 }
343 }
344 }
345
346 fn persist_log_truncation(&self, from_index: LogIndex) {
348 if let Some(ref p) = self.persistence {
349 if let Err(e) = p.truncate_log_from(from_index) {
350 warn!(node_id = self.node_id(), error = ?e, "Failed to persist log truncation");
351 }
352 }
353 }
354
355 pub fn node_id(&self) -> NodeId {
357 self.config.node_id
358 }
359
360 pub fn current_term(&self) -> Term {
362 self.persistent.read().current_term
363 }
364
365 pub fn state(&self) -> NodeState {
367 self.volatile.read().node_state
368 }
369
370 pub fn leader_id(&self) -> Option<NodeId> {
372 self.volatile.read().leader_id
373 }
374
375 pub fn is_leader(&self) -> bool {
377 self.volatile.read().is_leader()
378 }
379
380 pub fn commit_index(&self) -> LogIndex {
382 self.log.read().commit_index()
383 }
384
385 pub fn last_log_index(&self) -> LogIndex {
387 self.log.read().last_index()
388 }
389
390 pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
392 let volatile = self.volatile.read();
393 if !volatile.is_leader() {
394 return Err(RaftError::NotLeader {
395 leader_id: volatile.leader_id,
396 });
397 }
398 drop(volatile);
399
400 let term = self.current_term();
401 let mut log = self.log.write();
402 let index = log.append(term, command.clone());
403
404 let entry = LogEntry::new(term, index, command);
406 self.persist_log_entries(&[entry]);
407
408 info!(
409 node_id = self.node_id(),
410 index = index,
411 term = term,
412 "Proposed new entry"
413 );
414
415 Ok(index)
416 }
417
418 pub fn is_recovering(&self) -> bool {
420 self.is_recovering.load(Ordering::Acquire)
421 }
422
423 pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
425 if self.is_recovering.load(Ordering::Acquire) {
427 warn!(
428 node_id = self.node_id(),
429 candidate = req.candidate_id,
430 event = "rpc_rejected_recovering",
431 "Rejecting RequestVote: node is recovering from WAL"
432 );
433 return RequestVoteResponse::rejected(self.current_term());
434 }
435
436 let mut persistent = self.persistent.write();
437 let mut volatile = self.volatile.write();
438
439 debug!(
440 node_id = self.node_id(),
441 candidate = req.candidate_id,
442 term = req.term,
443 "Received RequestVote"
444 );
445
446 if req.term > persistent.current_term {
448 let from_term = persistent.current_term;
449 persistent.update_term(req.term);
450 self.persist_state(persistent.current_term, persistent.voted_for);
451 volatile.become_follower(None);
452 *self.leader_state.write() = None;
453 *self.candidate_state.write() = None;
454 crate::metrics::global().set_current_term(persistent.current_term);
455 debug!(
456 node_id = self.node_id(),
457 from_term = from_term,
458 to_term = persistent.current_term,
459 "Stepped down to follower (higher term in RequestVote)"
460 );
461 }
462
463 if req.term < persistent.current_term {
465 warn!(
466 node_id = self.node_id(),
467 candidate = req.candidate_id,
468 current_term = persistent.current_term,
469 request_term = req.term,
470 "Rejecting vote: stale term"
471 );
472 return RequestVoteResponse::rejected(persistent.current_term);
473 }
474
475 if let Some(voted_for) = persistent.voted_for {
477 if voted_for != req.candidate_id {
478 warn!(
479 node_id = self.node_id(),
480 candidate = req.candidate_id,
481 voted_for = voted_for,
482 "Rejecting vote: already voted"
483 );
484 return RequestVoteResponse::rejected(persistent.current_term);
485 }
486 }
487
488 let log = self.log.read();
490 let our_last_index = log.last_index();
491 let our_last_term = log.last_term();
492
493 let log_ok = req.last_log_term > our_last_term
494 || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
495
496 if !log_ok {
497 warn!(
498 node_id = self.node_id(),
499 candidate = req.candidate_id,
500 our_last_index = our_last_index,
501 our_last_term = our_last_term,
502 candidate_last_index = req.last_log_index,
503 candidate_last_term = req.last_log_term,
504 "Rejecting vote: candidate log not up-to-date"
505 );
506 return RequestVoteResponse::rejected(persistent.current_term);
507 }
508
509 persistent.grant_vote(req.candidate_id);
511 self.persist_state(persistent.current_term, persistent.voted_for);
512 *self.last_heartbeat.write() = Instant::now();
513
514 info!(
515 node_id = self.node_id(),
516 candidate = req.candidate_id,
517 term = req.term,
518 "Granted vote"
519 );
520
521 RequestVoteResponse::granted(persistent.current_term)
522 }
523
524 pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
526 if self.is_recovering.load(Ordering::Acquire) {
528 warn!(
529 node_id = self.node_id(),
530 leader = req.leader_id,
531 event = "rpc_rejected_recovering",
532 "Rejecting AppendEntries: node is recovering from WAL"
533 );
534 return AppendEntriesResponse::rejected(self.current_term());
535 }
536
537 let mut persistent = self.persistent.write();
538 let mut volatile = self.volatile.write();
539
540 debug!(
541 node_id = self.node_id(),
542 leader = req.leader_id,
543 term = req.term,
544 entries = req.entries.len(),
545 "Received AppendEntries"
546 );
547
548 if req.term > persistent.current_term {
550 let from_term = persistent.current_term;
551 persistent.update_term(req.term);
552 self.persist_state(persistent.current_term, persistent.voted_for);
553 volatile.become_follower(Some(req.leader_id));
554 *self.leader_state.write() = None;
555 *self.candidate_state.write() = None;
556 crate::metrics::global().set_current_term(persistent.current_term);
557 debug!(
558 node_id = self.node_id(),
559 from_term = from_term,
560 to_term = persistent.current_term,
561 leader_id = req.leader_id,
562 "Stepped down to follower (higher term in AppendEntries)"
563 );
564 }
565
566 if req.term < persistent.current_term {
568 warn!(
569 node_id = self.node_id(),
570 leader = req.leader_id,
571 current_term = persistent.current_term,
572 request_term = req.term,
573 "Rejecting AppendEntries: stale term"
574 );
575 return AppendEntriesResponse::rejected(persistent.current_term);
576 }
577
578 *self.last_heartbeat.write() = Instant::now();
580 volatile.become_follower(Some(req.leader_id));
581 *self.candidate_state.write() = None;
582
583 self.failover_coordinator.write().set_leader(req.leader_id);
586
587 drop(persistent);
588 drop(volatile);
589
590 let mut log = self.log.write();
592 let our_last_index = log.last_index();
593
594 if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
596 let conflict_index = req.prev_log_index.min(our_last_index);
598 let conflict_term = log.get_term(conflict_index).unwrap_or(0);
599
600 warn!(
601 node_id = self.node_id(),
602 prev_log_index = req.prev_log_index,
603 prev_log_term = req.prev_log_term,
604 conflict_index = conflict_index,
605 conflict_term = conflict_term,
606 "Rejecting AppendEntries: log inconsistency"
607 );
608
609 return AppendEntriesResponse::failure(
610 self.current_term(),
611 our_last_index,
612 conflict_index,
613 conflict_term,
614 );
615 }
616
617 if !req.entries.is_empty() {
619 let first_new_index = req.entries[0].index;
621 if first_new_index <= our_last_index {
622 if let Err(e) = log.truncate_from(first_new_index) {
623 warn!(
624 node_id = self.node_id(),
625 error = ?e,
626 "Failed to truncate log"
627 );
628 return AppendEntriesResponse::rejected(self.current_term());
629 }
630 self.persist_log_truncation(first_new_index);
631 }
632
633 self.persist_log_entries(&req.entries);
635
636 if let Err(e) = log.append_entries(req.entries) {
638 warn!(
639 node_id = self.node_id(),
640 error = ?e,
641 "Failed to append entries"
642 );
643 return AppendEntriesResponse::rejected(self.current_term());
644 }
645 }
646
647 if req.leader_commit > log.commit_index() {
649 let old_commit = log.commit_index();
650 let new_commit = req.leader_commit.min(log.last_index());
651 if let Err(e) = log.set_commit_index(new_commit) {
652 warn!(
653 node_id = self.node_id(),
654 error = ?e,
655 "Failed to update commit index"
656 );
657 } else {
658 crate::metrics::global().set_commit_index(new_commit);
659 crate::metrics::global().set_log_entry_count(log.last_index());
660 debug!(
661 node_id = self.node_id(),
662 old_commit_index = old_commit,
663 new_commit_index = new_commit,
664 "Updated commit index"
665 );
666 drop(log);
669 if let Err(e) = self.apply_committed_entries() {
670 warn!(node_id = self.node_id(), error = ?e, "Failed to apply committed entries");
671 }
672 return AppendEntriesResponse::success(self.current_term(), self.last_log_index());
673 }
674 }
675
676 AppendEntriesResponse::success(self.current_term(), log.last_index())
677 }
678
679 pub fn start_election(&self) -> Vec<RequestVoteRequest> {
681 let mut persistent = self.persistent.write();
682 let mut volatile = self.volatile.write();
683
684 persistent.current_term += 1;
686 persistent.grant_vote(self.node_id());
687
688 self.persist_state(persistent.current_term, persistent.voted_for);
690
691 volatile.become_candidate();
693
694 *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
696
697 let term = persistent.current_term;
698 let log = self.log.read();
699 let last_log_index = log.last_index();
700 let last_log_term = log.last_term();
701
702 let _span =
703 tracing::info_span!("raft_election", node_id = self.node_id(), term = term).entered();
704
705 let metrics = crate::metrics::global();
707 metrics.set_current_term(term);
708 metrics.inc_elections_started();
709
710 info!(
711 node_id = self.node_id(),
712 candidate_term = term,
713 log_length = last_log_index,
714 "Started election"
715 );
716
717 self.config
719 .peers
720 .iter()
721 .filter(|&&peer| peer != self.node_id())
722 .map(|&peer| {
723 RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
724 })
725 .collect()
726 }
727
728 pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
730 let should_become_leader = {
731 let mut persistent = self.persistent.write();
732 let mut volatile = self.volatile.write();
733
734 if !volatile.is_candidate() {
736 return false;
737 }
738
739 if resp.term > persistent.current_term {
741 let from_term = persistent.current_term;
742 persistent.update_term(resp.term);
743 self.persist_state(persistent.current_term, persistent.voted_for);
744 volatile.become_follower(None);
745 *self.candidate_state.write() = None;
746 crate::metrics::global().set_current_term(persistent.current_term);
747 debug!(
748 node_id = self.node_id(),
749 from_term = from_term,
750 to_term = persistent.current_term,
751 "Stepped down to follower (higher term in vote response)"
752 );
753 return false;
754 }
755
756 if resp.term < persistent.current_term {
758 return false;
759 }
760
761 if resp.vote_granted {
763 let mut candidate_state_guard = self.candidate_state.write();
764 if let Some(candidate_state) = candidate_state_guard.as_mut() {
765 candidate_state.record_vote(from);
766
767 info!(
768 node_id = self.node_id(),
769 from = from,
770 votes = candidate_state.vote_count(),
771 quorum = self.config.quorum_size(),
772 "Received vote"
773 );
774
775 candidate_state.has_quorum(self.config.quorum_size())
777 } else {
778 false
779 }
780 } else {
781 false
782 }
783 };
784
785 if should_become_leader {
787 let votes = self
788 .candidate_state
789 .read()
790 .as_ref()
791 .map(|cs| cs.vote_count())
792 .unwrap_or(0);
793 info!(
794 node_id = self.node_id(),
795 term = self.current_term(),
796 votes_received = votes,
797 "Won election with quorum"
798 );
799 self.become_leader();
800 return true;
801 }
802
803 false
804 }
805
806 fn become_leader(&self) {
808 let mut volatile = self.volatile.write();
809 volatile.become_leader();
810
811 let log = self.log.read();
812 let last_log_index = log.last_index();
813
814 *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
816 *self.candidate_state.write() = None;
817
818 let persistent = self.persistent.read();
819 let term = persistent.current_term;
820
821 self.fencing_token_state.bump_term_token(term as u32);
823
824 let prev_leader = {
826 let mut fo = self.failover_coordinator.write();
827 let old = fo.leader_hint();
828 fo.set_leader(self.node_id());
829 old
830 };
831
832 let metrics = crate::metrics::global();
834 metrics.set_current_term(term);
835 metrics.inc_leader_changes();
836
837 if let Some(am) = &self.alert_manager {
839 am.emit(AlertEvent::LeaderChanged {
840 old_leader: prev_leader,
841 new_leader: self.node_id(),
842 });
843 }
844
845 info!(
846 node_id = self.node_id(),
847 term,
848 voted_for = ?persistent.voted_for,
849 peer_count = self.config.peers.len(),
850 "Became leader"
851 );
852 }
853
854 pub fn issue_fencing_token(&self) -> Option<FencingToken> {
858 if !self.volatile.read().is_leader() {
859 return None;
860 }
861 Some(self.fencing_token_state.issue_token())
862 }
863
864 pub fn validate_fencing_token(&self, token: &FencingToken) -> RaftResult<()> {
869 let current_term = self.current_term();
870 if token.term() as u64 == current_term {
871 Ok(())
872 } else {
873 Err(RaftError::StaleTerm {
874 current: current_term,
875 received: token.term() as u64,
876 })
877 }
878 }
879
880 pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
882 let volatile = self.volatile.read();
883 if !volatile.is_leader() {
884 return Vec::new();
885 }
886 drop(volatile);
887
888 let term = self.current_term();
889 let log = self.log.read();
890 let leader_commit = log.commit_index();
891
892 self.config
893 .peers
894 .iter()
895 .filter(|&&peer| peer != self.node_id())
896 .map(|&peer| {
897 let prev_log_index = log.last_index();
898 let prev_log_term = log.last_term();
899
900 let req = AppendEntriesRequest::heartbeat(
901 term,
902 self.node_id(),
903 prev_log_index,
904 prev_log_term,
905 leader_commit,
906 );
907
908 (peer, req)
909 })
910 .collect()
911 }
912
913 pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
915 let volatile = self.volatile.read();
916 if !volatile.is_leader() {
917 return Vec::new();
918 }
919 drop(volatile);
920
921 let leader_state_guard = self.leader_state.read();
922 let leader_state = match leader_state_guard.as_ref() {
923 Some(state) => state,
924 None => return Vec::new(),
925 };
926
927 let term = self.current_term();
928 let log = self.log.read();
929 let leader_commit = log.commit_index();
930
931 self.config
932 .peers
933 .iter()
934 .filter(|&&peer| peer != self.node_id())
935 .filter_map(|&peer| {
936 let next_index = leader_state.get_next_index(peer);
937
938 if next_index > log.last_index() {
939 return None;
940 }
941
942 let prev_log_index = next_index.saturating_sub(1);
943 let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
944
945 let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
946
947 if entries.is_empty() {
948 return None;
949 }
950
951 let req = AppendEntriesRequest::new(
952 term,
953 self.node_id(),
954 prev_log_index,
955 prev_log_term,
956 entries,
957 leader_commit,
958 );
959
960 Some((peer, req))
961 })
962 .collect()
963 }
964
965 pub fn replicate_to_followers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
982 self.create_replication_requests()
983 }
984
985 pub fn create_replication_request_for(&self, peer: NodeId) -> Option<AppendEntriesRequest> {
990 let volatile = self.volatile.read();
991 if !volatile.is_leader() {
992 return None;
993 }
994 drop(volatile);
995
996 let leader_state_guard = self.leader_state.read();
997 let leader_state = leader_state_guard.as_ref()?;
998
999 let term = self.current_term();
1000 let log = self.log.read();
1001 let leader_commit = log.commit_index();
1002
1003 let next_index = leader_state.get_next_index(peer);
1004
1005 if next_index > log.last_index() {
1006 return None;
1008 }
1009
1010 let prev_log_index = next_index.saturating_sub(1);
1011 let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
1012
1013 let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
1014
1015 if entries.is_empty() {
1016 return None;
1017 }
1018
1019 Some(AppendEntriesRequest::new(
1020 term,
1021 self.node_id(),
1022 prev_log_index,
1023 prev_log_term,
1024 entries,
1025 leader_commit,
1026 ))
1027 }
1028
1029 pub fn handle_replication_response(
1031 &self,
1032 from: NodeId,
1033 resp: AppendEntriesResponse,
1034 ) -> RaftResult<()> {
1035 let mut persistent = self.persistent.write();
1036 let mut volatile = self.volatile.write();
1037
1038 if !volatile.is_leader() {
1040 return Ok(());
1041 }
1042
1043 if resp.term > persistent.current_term {
1045 let from_term = persistent.current_term;
1046 persistent.update_term(resp.term);
1047 self.persist_state(persistent.current_term, persistent.voted_for);
1048 volatile.become_follower(None);
1049 *self.leader_state.write() = None;
1050 self.snapshot_streamers.write().clear();
1051 crate::metrics::global().set_current_term(persistent.current_term);
1052 info!(
1053 node_id = self.node_id(),
1054 from_term = from_term,
1055 to_term = persistent.current_term,
1056 "Stepped down: leader to follower (higher term in replication response)"
1057 );
1058 return Ok(());
1059 }
1060
1061 drop(persistent);
1062 drop(volatile);
1063
1064 let mut leader_state_guard = self.leader_state.write();
1065 let leader_state = match leader_state_guard.as_mut() {
1066 Some(state) => state,
1067 None => return Ok(()),
1068 };
1069
1070 if resp.success {
1071 leader_state.update_success(from, resp.last_log_index);
1073
1074 debug!(
1075 node_id = self.node_id(),
1076 peer = from,
1077 match_index = resp.last_log_index,
1078 "Replication successful"
1079 );
1080
1081 {
1083 const SLOW_REPLICATION_THRESHOLD: u64 = 100;
1084 let committed = self.log.read().commit_index();
1085 let follower_match = resp.last_log_index;
1086 if committed > follower_match {
1087 let lag = committed - follower_match;
1088 if lag > SLOW_REPLICATION_THRESHOLD {
1089 if let Some(am) = &self.alert_manager {
1090 am.emit(AlertEvent::SlowReplication {
1091 follower: from,
1092 lag_entries: lag,
1093 });
1094 }
1095 }
1096 }
1097 }
1098
1099 let config_state = self.config_state.read().clone();
1102 let new_commit = leader_state.calculate_commit_index_joint(
1103 self.node_id(),
1104 self.log.read().last_index(),
1105 &config_state,
1106 );
1107
1108 let committed_advanced = {
1109 let mut log = self.log.write();
1110 if new_commit > log.commit_index() {
1111 if let Some(term) = log.get_term(new_commit) {
1113 if term == self.current_term() {
1114 let old_commit = log.commit_index();
1115 log.set_commit_index(new_commit)?;
1116 crate::metrics::global().set_commit_index(new_commit);
1117 crate::metrics::global().set_log_entry_count(log.last_index());
1118 info!(
1119 node_id = self.node_id(),
1120 old_commit_index = old_commit,
1121 new_commit_index = new_commit,
1122 "Advanced commit index"
1123 );
1124 true
1125 } else {
1126 false
1127 }
1128 } else {
1129 false
1130 }
1131 } else {
1132 false
1133 }
1134 };
1135 if committed_advanced {
1136 if let Err(e) = self.apply_committed_entries() {
1137 warn!(node_id = self.node_id(), error = ?e, "Failed to apply committed entries");
1138 }
1139 }
1140 } else {
1141 if resp.conflict_index.is_some() || resp.conflict_term.is_some() {
1144 leader_state.update_failure_with_hint(
1145 from,
1146 resp.conflict_index,
1147 resp.conflict_term,
1148 resp.last_log_index,
1149 );
1150 } else {
1151 leader_state.update_failure(from);
1152 }
1153
1154 warn!(
1155 node_id = self.node_id(),
1156 peer = from,
1157 next_index = leader_state.get_next_index(from),
1158 conflict_index = ?resp.conflict_index,
1159 conflict_term = ?resp.conflict_term,
1160 "Replication failed, will retry with adjusted next_index"
1161 );
1162 }
1163
1164 Ok(())
1165 }
1166
1167 pub fn maybe_create_snapshot(&self, state_machine_data: Vec<u8>) -> RaftResult<bool> {
1175 let mut snap_guard = self.snapshot_manager.write();
1176 let manager = match snap_guard.as_mut() {
1177 Some(m) => m,
1178 None => return Ok(false),
1179 };
1180
1181 let log = self.log.read();
1182 let entries_since = log.entries_since_snapshot();
1183
1184 if !manager.should_snapshot(entries_since) {
1185 return Ok(false);
1186 }
1187
1188 let applied_index = log.applied_index();
1189 if applied_index == 0 {
1190 return Ok(false);
1191 }
1192
1193 let applied_term = match log.get_term(applied_index) {
1194 Some(t) => t,
1195 None => {
1196 let (snap_idx, snap_term) = log.get_snapshot_point();
1198 if applied_index == snap_idx {
1199 snap_term
1200 } else {
1201 return Err(RaftError::LogInconsistency {
1202 reason: format!(
1203 "Cannot determine term for applied index {}",
1204 applied_index
1205 ),
1206 });
1207 }
1208 }
1209 };
1210
1211 drop(log);
1212
1213 manager.create_snapshot(state_machine_data, applied_index, applied_term)?;
1214
1215 let mut log = self.log.write();
1217 log.compact_until(applied_index, applied_term)?;
1218
1219 info!(
1220 node_id = self.node_id(),
1221 snapshot_index = applied_index,
1222 snapshot_term = applied_term,
1223 "Created snapshot and compacted log"
1224 );
1225
1226 Ok(true)
1227 }
1228
1229 pub fn auto_snapshot_if_needed<F>(
1237 &self,
1238 policy: &SnapshotPolicy,
1239 state_machine_data_fn: F,
1240 ) -> RaftResult<bool>
1241 where
1242 F: FnOnce() -> RaftResult<Vec<u8>>,
1243 {
1244 let log = self.log.read();
1245 let entries_since = log.entries_since_snapshot();
1246 let applied_index = log.applied_index();
1247
1248 if !policy.should_snapshot(entries_since, applied_index) {
1249 return Ok(false);
1250 }
1251
1252 if applied_index == 0 {
1253 return Ok(false);
1254 }
1255
1256 let applied_term = match log.get_term(applied_index) {
1257 Some(t) => t,
1258 None => {
1259 let (snap_idx, snap_term) = log.get_snapshot_point();
1260 if applied_index == snap_idx {
1261 snap_term
1262 } else {
1263 return Err(RaftError::LogInconsistency {
1264 reason: format!(
1265 "Cannot determine term for applied index {}",
1266 applied_index
1267 ),
1268 });
1269 }
1270 }
1271 };
1272
1273 drop(log);
1274
1275 let data = state_machine_data_fn()?;
1276
1277 let mut snap_guard = self.snapshot_manager.write();
1278 let manager = match snap_guard.as_mut() {
1279 Some(m) => m,
1280 None => return Ok(false),
1281 };
1282
1283 manager.create_snapshot(data, applied_index, applied_term)?;
1284 drop(snap_guard);
1285
1286 let mut log = self.log.write();
1287 log.compact_until(applied_index, applied_term)?;
1288
1289 info!(
1290 node_id = self.node_id(),
1291 snapshot_index = applied_index,
1292 snapshot_term = applied_term,
1293 entries_compacted = entries_since,
1294 "Auto-snapshot triggered and log compacted"
1295 );
1296
1297 Ok(true)
1298 }
1299
1300 pub fn handle_install_snapshot(
1305 &self,
1306 req: InstallSnapshotRequest,
1307 ) -> RaftResult<InstallSnapshotResponse> {
1308 let mut persistent = self.persistent.write();
1309 let mut volatile = self.volatile.write();
1310
1311 debug!(
1312 node_id = self.node_id(),
1313 leader = req.leader_id,
1314 term = req.term,
1315 last_included_index = req.last_included_index,
1316 last_included_term = req.last_included_term,
1317 offset = req.offset,
1318 done = req.done,
1319 "Received InstallSnapshot"
1320 );
1321
1322 if req.term > persistent.current_term {
1324 let from_term = persistent.current_term;
1325 persistent.update_term(req.term);
1326 self.persist_state(persistent.current_term, persistent.voted_for);
1327 volatile.become_follower(Some(req.leader_id));
1328 *self.leader_state.write() = None;
1329 *self.candidate_state.write() = None;
1330 self.snapshot_streamers.write().clear();
1331 self.snapshot_stream_receivers.write().clear();
1332 debug!(
1333 node_id = self.node_id(),
1334 from_term = from_term,
1335 to_term = persistent.current_term,
1336 leader_id = req.leader_id,
1337 "Stepped down to follower (higher term in InstallSnapshot)"
1338 );
1339 }
1340
1341 if req.term < persistent.current_term {
1343 warn!(
1344 node_id = self.node_id(),
1345 leader = req.leader_id,
1346 current_term = persistent.current_term,
1347 request_term = req.term,
1348 "Rejecting InstallSnapshot: stale term"
1349 );
1350 return Ok(InstallSnapshotResponse::new(persistent.current_term));
1351 }
1352
1353 *self.last_heartbeat.write() = Instant::now();
1355 volatile.become_follower(Some(req.leader_id));
1356 *self.candidate_state.write() = None;
1357
1358 let current_term = persistent.current_term;
1359 drop(persistent);
1360 drop(volatile);
1361
1362 let snapshot_dir = self.config.snapshot_dir.clone();
1366
1367 if let Some(ref dir) = snapshot_dir {
1368 let mut receivers = self.snapshot_stream_receivers.write();
1370
1371 let receiver = match receivers.entry(req.leader_id) {
1373 std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
1374 std::collections::hash_map::Entry::Vacant(v) => {
1375 let r = SnapshotStreamReceiver::new(
1376 dir,
1377 req.last_included_index,
1378 req.last_included_term,
1379 )?;
1380 v.insert(r)
1381 }
1382 };
1383
1384 let result = receiver.receive_chunk(&req)?;
1385
1386 if let Some(final_path) = result {
1387 receivers.remove(&req.leader_id);
1389 drop(receivers);
1390
1391 let snapshot_data = std::fs::read(&final_path).map_err(|e| {
1393 crate::error::RaftError::StorageError {
1394 message: format!(
1395 "Failed to read received snapshot file '{}': {}",
1396 final_path.display(),
1397 e
1398 ),
1399 }
1400 })?;
1401
1402 let snapshot = Snapshot::new(
1403 req.last_included_index,
1404 req.last_included_term,
1405 snapshot_data,
1406 );
1407
1408 let mut snap_guard = self.snapshot_manager.write();
1409 if let Some(manager) = snap_guard.as_mut() {
1410 manager.install_snapshot(snapshot)?;
1411 }
1412 drop(snap_guard);
1413
1414 let mut log = self.log.write();
1416 log.install_snapshot(req.last_included_index, req.last_included_term);
1417
1418 info!(
1419 node_id = self.node_id(),
1420 last_included_index = req.last_included_index,
1421 last_included_term = req.last_included_term,
1422 "Installed snapshot from leader (streaming path)"
1423 );
1424 }
1425 } else {
1426 let mut receiver_guard = self.snapshot_receiver.write();
1428
1429 if req.offset == 0 {
1430 *receiver_guard = Some(SnapshotReceiver::new(
1431 req.last_included_index,
1432 req.last_included_term,
1433 ));
1434 }
1435
1436 let receiver = match receiver_guard.as_mut() {
1437 Some(r) => r,
1438 None => {
1439 warn!(
1440 node_id = self.node_id(),
1441 offset = req.offset,
1442 "Received non-initial snapshot chunk without active receiver"
1443 );
1444 return Ok(InstallSnapshotResponse::new(current_term));
1445 }
1446 };
1447
1448 let completed = receiver.receive_chunk(&req)?;
1449
1450 if let Some(snapshot) = completed {
1451 *receiver_guard = None;
1452 drop(receiver_guard);
1453
1454 let mut snap_guard = self.snapshot_manager.write();
1455 if let Some(manager) = snap_guard.as_mut() {
1456 manager.install_snapshot(snapshot)?;
1457 }
1458
1459 let mut log = self.log.write();
1460 log.install_snapshot(req.last_included_index, req.last_included_term);
1461
1462 info!(
1463 node_id = self.node_id(),
1464 last_included_index = req.last_included_index,
1465 last_included_term = req.last_included_term,
1466 "Installed snapshot from leader (in-memory path)"
1467 );
1468 }
1469 }
1470
1471 Ok(InstallSnapshotResponse::new(current_term))
1472 }
1473
1474 pub fn prepare_install_snapshot(
1486 &self,
1487 target_peer: NodeId,
1488 ) -> RaftResult<Option<InstallSnapshotRequest>> {
1489 if !self.volatile.read().is_leader() {
1491 return Ok(None);
1492 }
1493
1494 let snap_meta = {
1498 let snap_guard = self.snapshot_manager.read();
1499 match snap_guard.as_ref() {
1500 Some(m) => match m.get_latest_metadata() {
1501 Some(meta) => meta.clone(),
1502 None => return Ok(None),
1503 },
1504 None => return Ok(None),
1505 }
1506 };
1507
1508 let next_index = {
1510 let leader_guard = self.leader_state.read();
1511 match leader_guard.as_ref() {
1512 Some(ls) => ls.get_next_index(target_peer),
1513 None => return Ok(None),
1515 }
1516 };
1517
1518 if next_index > snap_meta.last_included_index {
1519 self.snapshot_streamers.write().remove(&target_peer);
1521 return Ok(None);
1522 }
1523
1524 let term = self.current_term();
1525 let threshold = self.config.snapshot_chunk_threshold_bytes;
1526
1527 if snap_meta.size_bytes <= threshold {
1528 let snapshot = {
1530 let snap_guard = self.snapshot_manager.read();
1531 match snap_guard.as_ref() {
1532 Some(m) => m.load_latest()?.ok_or_else(|| RaftError::StorageError {
1533 message: "snapshot disappeared between metadata fetch and load".into(),
1534 })?,
1535 None => return Ok(None),
1536 }
1537 };
1538 return Ok(Some(InstallSnapshotRequest::new_complete(
1539 term,
1540 self.node_id(),
1541 snap_meta.last_included_index,
1542 snap_meta.last_included_term,
1543 snapshot.data,
1544 )));
1545 }
1546
1547 let chunk_size = self.config.snapshot_chunk_size_bytes;
1549
1550 let data_path_for_new = {
1554 let snap_guard = self.snapshot_manager.read();
1555 match snap_guard.as_ref() {
1556 Some(m) => m
1557 .latest_data_path()
1558 .ok_or_else(|| RaftError::StorageError {
1559 message: "snapshot data path not found for chunked transfer".into(),
1560 })?,
1561 None => return Ok(None),
1562 }
1563 };
1564
1565 let mut streamers = self.snapshot_streamers.write();
1566 let streamer = match streamers.entry(target_peer) {
1567 std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
1568 std::collections::hash_map::Entry::Vacant(v) => {
1569 v.insert(crate::snapshot::SnapshotStreamer::new(
1570 data_path_for_new,
1571 snap_meta.clone(),
1572 chunk_size,
1573 )?)
1574 }
1575 };
1576
1577 let req = streamer.next_chunk_for_rpc(term, self.node_id())?;
1578
1579 let is_done = req.as_ref().is_some_and(|r| r.done);
1581 drop(streamers);
1582 if is_done {
1583 self.snapshot_streamers.write().remove(&target_peer);
1584 }
1585
1586 Ok(req)
1587 }
1588
1589 pub fn follower_needs_snapshot(&self, peer: NodeId) -> bool {
1593 let leader_state_guard = self.leader_state.read();
1594 let leader_state = match leader_state_guard.as_ref() {
1595 Some(s) => s,
1596 None => return false,
1597 };
1598
1599 let next_index = leader_state.get_next_index(peer);
1600 let log = self.log.read();
1601 let (snap_idx, _) = log.get_snapshot_point();
1602
1603 snap_idx > 0 && next_index <= snap_idx
1604 }
1605
1606 pub fn add_node(&self, node_id: NodeId, address: String) -> RaftResult<()> {
1614 self.propose_membership_change(MembershipChange::AddNode { node_id, address })
1615 }
1616
1617 pub fn remove_node(&self, node_id: NodeId) -> RaftResult<()> {
1622 self.propose_membership_change(MembershipChange::RemoveNode { node_id })
1623 }
1624
1625 pub fn cluster_members(&self) -> Vec<(NodeId, String)> {
1627 self.config_state.read().all_members()
1628 }
1629
1630 pub fn is_in_joint_consensus(&self) -> bool {
1632 self.config_state.read().is_joint()
1633 }
1634
1635 pub fn membership_version(&self) -> u64 {
1637 self.config_state.read().version()
1638 }
1639
1640 pub fn propose_membership_change(&self, change: MembershipChange) -> RaftResult<()> {
1648 let volatile = self.volatile.read();
1650 if !volatile.is_leader() {
1651 return Err(RaftError::NotLeader {
1652 leader_id: volatile.leader_id,
1653 });
1654 }
1655 drop(volatile);
1656
1657 let mut cs = self.config_state.write();
1658
1659 if cs.is_joint() {
1661 return Err(RaftError::MembershipChangeInProgress);
1662 }
1663
1664 let current = match &*cs {
1665 ConfigState::Stable(c) => c.clone(),
1666 ConfigState::Joint { .. } => return Err(RaftError::MembershipChangeInProgress),
1667 };
1668
1669 let new_config = match &change {
1671 MembershipChange::AddNode { node_id, address } => {
1672 if current.contains(*node_id) {
1673 return Err(RaftError::NodeAlreadyMember { node_id: *node_id });
1674 }
1675 current.with_added_member(*node_id, address.clone())
1676 }
1677 MembershipChange::RemoveNode { node_id } => {
1678 if !current.contains(*node_id) {
1679 return Err(RaftError::NodeNotMember { node_id: *node_id });
1680 }
1681 current.without_member(*node_id)
1682 }
1683 };
1684
1685 *cs = ConfigState::Joint {
1687 old: current.clone(),
1688 new: new_config.clone(),
1689 };
1690
1691 info!(
1692 node_id = self.node_id(),
1693 change = ?change,
1694 old_version = current.version(),
1695 new_version = new_config.version(),
1696 "Entered joint consensus"
1697 );
1698
1699 self.update_leader_state_for_config(&cs);
1702
1703 let term = self.current_term();
1705 let mut log = self.log.write();
1706 let _index = log.append(term, Command::from_str("__membership_joint__"));
1707
1708 Ok(())
1709 }
1710
1711 pub fn commit_membership_change(&self) -> RaftResult<()> {
1716 let mut cs = self.config_state.write();
1717 let new_config = match &*cs {
1718 ConfigState::Joint { new, .. } => new.clone(),
1719 ConfigState::Stable(_) => {
1720 return Ok(());
1722 }
1723 };
1724
1725 *cs = ConfigState::Stable(new_config.clone());
1726
1727 info!(
1728 node_id = self.node_id(),
1729 version = new_config.version(),
1730 members = ?new_config.member_ids(),
1731 "Committed membership change, now stable"
1732 );
1733
1734 self.update_leader_state_for_config(&cs);
1736
1737 let term = self.current_term();
1739 let mut log = self.log.write();
1740 let _index = log.append(term, Command::from_str("__membership_stable__"));
1741
1742 if !new_config.contains(self.node_id()) {
1744 drop(cs);
1745 drop(log);
1746 self.step_down();
1747 }
1748
1749 Ok(())
1750 }
1751
1752 pub fn has_quorum(&self, responding_nodes: &HashSet<NodeId>) -> bool {
1755 self.config_state.read().has_quorum(responding_nodes)
1756 }
1757
1758 pub fn is_stepping_down(&self) -> bool {
1760 *self.stepping_down.read()
1761 }
1762
1763 pub fn attach_placement_scheduler(
1770 self: &Arc<Self>,
1771 scheduler: crate::placement_scheduler::PlacementScheduler,
1772 ) {
1773 let handle = scheduler.handle();
1774 tokio::spawn(scheduler.run());
1775 let mut guard = self.placement_scheduler_handle.write();
1776 if let Some(old) = guard.take() {
1777 old.stop();
1778 }
1779 *guard = Some(handle);
1780 }
1781
1782 fn step_down(&self) {
1784 let mut volatile = self.volatile.write();
1785 volatile.become_follower(None);
1786 *self.leader_state.write() = None;
1787 *self.candidate_state.write() = None;
1788 *self.stepping_down.write() = true;
1789 self.snapshot_streamers.write().clear();
1790 self.snapshot_stream_receivers.write().clear();
1791 if let Some(handle) = self.placement_scheduler_handle.write().take() {
1792 handle.stop();
1793 }
1794
1795 info!(
1796 node_id = self.node_id(),
1797 "Stepping down -- removed from cluster"
1798 );
1799 }
1800
1801 fn update_leader_state_for_config(&self, cs: &ConfigState) {
1804 let mut ls_guard = self.leader_state.write();
1805 let ls = match ls_guard.as_mut() {
1806 Some(s) => s,
1807 None => return,
1808 };
1809
1810 let all_ids = cs.all_member_ids();
1811 let log = self.log.read();
1812 let last_log_index = log.last_index();
1813
1814 for &id in &all_ids {
1816 if id == self.node_id() {
1817 continue;
1818 }
1819 ls.next_index.entry(id).or_insert(last_log_index + 1);
1820 ls.match_index.entry(id).or_insert(0);
1821 }
1822
1823 ls.next_index
1825 .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1826 ls.match_index
1827 .retain(|id, _| all_ids.contains(id) || *id == self.node_id());
1828 }
1829
1830 pub fn election_timeout_elapsed(&self) -> bool {
1844 let last_heartbeat = *self.last_heartbeat.read();
1845 let static_timeout = self.config.random_election_timeout();
1846 let dynamic_min_ms = self
1848 .dynamic_config
1849 .read()
1850 .heartbeat_interval_ms
1851 .saturating_mul(2);
1852 let effective_timeout = static_timeout.max(Duration::from_millis(dynamic_min_ms));
1853 last_heartbeat.elapsed() >= effective_timeout
1854 }
1855
1856 pub fn reset_election_timer(&self) {
1858 *self.last_heartbeat.write() = Instant::now();
1859 }
1860
1861 pub fn get_leader_hint(&self) -> Option<NodeId> {
1863 self.volatile.read().leader_id
1864 }
1865
1866 pub fn get_dynamic_config(&self) -> DynamicConfig {
1868 self.dynamic_config.read().clone()
1869 }
1870
1871 pub fn update_dynamic_config(&self, new_config: DynamicConfig) {
1881 *self.dynamic_config.write() = new_config;
1882 debug!(
1883 node_id = self.node_id(),
1884 heartbeat_interval_ms = self.dynamic_config.read().heartbeat_interval_ms,
1885 compaction_threshold = self.dynamic_config.read().compaction_threshold,
1886 "Dynamic configuration updated"
1887 );
1888 }
1889
1890 pub fn set_state_machine(&self, sm: impl StateMachine + 'static) -> RaftResult<()> {
1899 let mut guard = self.state_machine.lock();
1900 *guard = Some(Box::new(sm));
1901 Ok(())
1902 }
1903
1904 fn apply_committed_entries(&self) -> RaftResult<()> {
1912 let entries_to_apply = {
1915 let log = self.log.read();
1916 if log.applied_index() >= log.commit_index() {
1917 return Ok(());
1918 }
1919 log.get_uncommitted_entries()
1920 };
1921
1922 if entries_to_apply.is_empty() {
1923 return Ok(());
1924 }
1925
1926 let mut sm_guard = self.state_machine.lock();
1927 let sm = match sm_guard.as_mut() {
1928 Some(s) => s,
1929 None => {
1930 let mut log = self.log.write();
1932 if let Some(last) = entries_to_apply.last() {
1933 log.set_applied_index(last.index)?;
1934 }
1935 return Ok(());
1936 }
1937 };
1938
1939 for entry in &entries_to_apply {
1940 sm.apply(entry).map_err(|e| {
1941 warn!(
1942 node_id = self.node_id(),
1943 index = entry.index,
1944 error = ?e,
1945 "State machine failed to apply entry"
1946 );
1947 e
1948 })?;
1949 self.log.write().set_applied_index(entry.index)?;
1952 }
1953
1954 Ok(())
1955 }
1956
1957 pub fn set_alert_manager(&mut self, am: Arc<AlertManager>) {
1964 self.alert_manager = Some(am);
1965 }
1966
1967 pub fn trigger_failover_election(&self) -> Vec<RequestVoteRequest> {
1971 let state = self.volatile.read().node_state;
1972 if state != NodeState::Follower {
1973 return Vec::new();
1974 }
1975 self.start_election()
1976 }
1977}
1978
1979#[path = "node_wal_replay.rs"]
1984mod wal_replay;
1985use wal_replay::replay_wal_into_log;
1986
1987#[cfg(test)]
1988#[path = "node_tests.rs"]
1989mod tests;
1990
1991#[cfg(test)]
1992#[path = "node_tests_advanced.rs"]
1993mod tests_advanced;
1994
1995#[cfg(test)]
1996#[path = "integration_tests.rs"]
1997mod integration_tests;
1998#[cfg(test)]
1999#[path = "node_snapshot_tests.rs"]
2000mod snapshot_tests;