1use crate::error::{RaftError, RaftResult};
4use crate::log::{Command, RaftLog};
5use crate::rpc::{
6 AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
7};
8use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
9use crate::types::{LogIndex, NodeId, NodeState, RaftConfig, Term};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tracing::{debug, info, warn};
14
15pub struct RaftNode {
17 config: Arc<RaftConfig>,
19 persistent: Arc<RwLock<PersistentState>>,
21 volatile: Arc<RwLock<VolatileState>>,
23 log: Arc<RwLock<RaftLog>>,
25 leader_state: Arc<RwLock<Option<LeaderState>>>,
27 candidate_state: Arc<RwLock<Option<CandidateState>>>,
29 last_heartbeat: Arc<RwLock<Instant>>,
31}
32
33impl RaftNode {
34 pub fn new(config: RaftConfig) -> RaftResult<Self> {
36 config
38 .validate()
39 .map_err(|msg| RaftError::ConfigError { message: msg })?;
40
41 Ok(Self {
42 config: Arc::new(config),
43 persistent: Arc::new(RwLock::new(PersistentState::new())),
44 volatile: Arc::new(RwLock::new(VolatileState::new())),
45 log: Arc::new(RwLock::new(RaftLog::new())),
46 leader_state: Arc::new(RwLock::new(None)),
47 candidate_state: Arc::new(RwLock::new(None)),
48 last_heartbeat: Arc::new(RwLock::new(Instant::now())),
49 })
50 }
51
52 pub fn node_id(&self) -> NodeId {
54 self.config.node_id
55 }
56
57 pub fn current_term(&self) -> Term {
59 self.persistent.read().current_term
60 }
61
62 pub fn state(&self) -> NodeState {
64 self.volatile.read().node_state
65 }
66
67 pub fn leader_id(&self) -> Option<NodeId> {
69 self.volatile.read().leader_id
70 }
71
72 pub fn is_leader(&self) -> bool {
74 self.volatile.read().is_leader()
75 }
76
77 pub fn commit_index(&self) -> LogIndex {
79 self.log.read().commit_index()
80 }
81
82 pub fn last_log_index(&self) -> LogIndex {
84 self.log.read().last_index()
85 }
86
87 pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
89 let volatile = self.volatile.read();
90 if !volatile.is_leader() {
91 return Err(RaftError::NotLeader {
92 leader_id: volatile.leader_id,
93 });
94 }
95 drop(volatile);
96
97 let term = self.current_term();
98 let mut log = self.log.write();
99 let index = log.append(term, command);
100
101 info!(
102 node_id = self.node_id(),
103 index = index,
104 term = term,
105 "Proposed new entry"
106 );
107
108 Ok(index)
109 }
110
111 pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
113 let mut persistent = self.persistent.write();
114 let mut volatile = self.volatile.write();
115
116 debug!(
117 node_id = self.node_id(),
118 candidate = req.candidate_id,
119 term = req.term,
120 "Received RequestVote"
121 );
122
123 if req.term > persistent.current_term {
125 persistent.update_term(req.term);
126 volatile.become_follower(None);
127 *self.leader_state.write() = None;
128 *self.candidate_state.write() = None;
129 }
130
131 if req.term < persistent.current_term {
133 warn!(
134 node_id = self.node_id(),
135 candidate = req.candidate_id,
136 current_term = persistent.current_term,
137 request_term = req.term,
138 "Rejecting vote: stale term"
139 );
140 return RequestVoteResponse::rejected(persistent.current_term);
141 }
142
143 if let Some(voted_for) = persistent.voted_for {
145 if voted_for != req.candidate_id {
146 warn!(
147 node_id = self.node_id(),
148 candidate = req.candidate_id,
149 voted_for = voted_for,
150 "Rejecting vote: already voted"
151 );
152 return RequestVoteResponse::rejected(persistent.current_term);
153 }
154 }
155
156 let log = self.log.read();
158 let our_last_index = log.last_index();
159 let our_last_term = log.last_term();
160
161 let log_ok = req.last_log_term > our_last_term
162 || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
163
164 if !log_ok {
165 warn!(
166 node_id = self.node_id(),
167 candidate = req.candidate_id,
168 our_last_index = our_last_index,
169 our_last_term = our_last_term,
170 candidate_last_index = req.last_log_index,
171 candidate_last_term = req.last_log_term,
172 "Rejecting vote: candidate log not up-to-date"
173 );
174 return RequestVoteResponse::rejected(persistent.current_term);
175 }
176
177 persistent.grant_vote(req.candidate_id);
179 *self.last_heartbeat.write() = Instant::now();
180
181 info!(
182 node_id = self.node_id(),
183 candidate = req.candidate_id,
184 term = req.term,
185 "Granted vote"
186 );
187
188 RequestVoteResponse::granted(persistent.current_term)
189 }
190
191 pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
193 let mut persistent = self.persistent.write();
194 let mut volatile = self.volatile.write();
195
196 debug!(
197 node_id = self.node_id(),
198 leader = req.leader_id,
199 term = req.term,
200 entries = req.entries.len(),
201 "Received AppendEntries"
202 );
203
204 if req.term > persistent.current_term {
206 persistent.update_term(req.term);
207 volatile.become_follower(Some(req.leader_id));
208 *self.leader_state.write() = None;
209 *self.candidate_state.write() = None;
210 }
211
212 if req.term < persistent.current_term {
214 warn!(
215 node_id = self.node_id(),
216 leader = req.leader_id,
217 current_term = persistent.current_term,
218 request_term = req.term,
219 "Rejecting AppendEntries: stale term"
220 );
221 return AppendEntriesResponse::rejected(persistent.current_term);
222 }
223
224 *self.last_heartbeat.write() = Instant::now();
226 volatile.become_follower(Some(req.leader_id));
227 *self.candidate_state.write() = None;
228
229 drop(persistent);
230 drop(volatile);
231
232 let mut log = self.log.write();
234 let our_last_index = log.last_index();
235
236 if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
238 let conflict_index = req.prev_log_index.min(our_last_index);
240 let conflict_term = log.get_term(conflict_index).unwrap_or(0);
241
242 warn!(
243 node_id = self.node_id(),
244 prev_log_index = req.prev_log_index,
245 prev_log_term = req.prev_log_term,
246 conflict_index = conflict_index,
247 conflict_term = conflict_term,
248 "Rejecting AppendEntries: log inconsistency"
249 );
250
251 return AppendEntriesResponse::failure(
252 self.current_term(),
253 our_last_index,
254 conflict_index,
255 conflict_term,
256 );
257 }
258
259 if !req.entries.is_empty() {
261 let first_new_index = req.entries[0].index;
263 if first_new_index <= our_last_index {
264 if let Err(e) = log.truncate_from(first_new_index) {
265 warn!(
266 node_id = self.node_id(),
267 error = ?e,
268 "Failed to truncate log"
269 );
270 return AppendEntriesResponse::rejected(self.current_term());
271 }
272 }
273
274 if let Err(e) = log.append_entries(req.entries) {
276 warn!(
277 node_id = self.node_id(),
278 error = ?e,
279 "Failed to append entries"
280 );
281 return AppendEntriesResponse::rejected(self.current_term());
282 }
283 }
284
285 if req.leader_commit > log.commit_index() {
287 let new_commit = req.leader_commit.min(log.last_index());
288 if let Err(e) = log.set_commit_index(new_commit) {
289 warn!(
290 node_id = self.node_id(),
291 error = ?e,
292 "Failed to update commit index"
293 );
294 } else {
295 debug!(
296 node_id = self.node_id(),
297 commit_index = new_commit,
298 "Updated commit index"
299 );
300 }
301 }
302
303 AppendEntriesResponse::success(self.current_term(), log.last_index())
304 }
305
306 pub fn start_election(&self) -> Vec<RequestVoteRequest> {
308 let mut persistent = self.persistent.write();
309 let mut volatile = self.volatile.write();
310
311 persistent.current_term += 1;
313 persistent.grant_vote(self.node_id());
314
315 volatile.become_candidate();
317
318 *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
320
321 let term = persistent.current_term;
322 let log = self.log.read();
323 let last_log_index = log.last_index();
324 let last_log_term = log.last_term();
325
326 info!(node_id = self.node_id(), term = term, "Started election");
327
328 self.config
330 .peers
331 .iter()
332 .filter(|&&peer| peer != self.node_id())
333 .map(|&peer| {
334 RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
335 })
336 .collect()
337 }
338
339 pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
341 let should_become_leader = {
342 let mut persistent = self.persistent.write();
343 let mut volatile = self.volatile.write();
344
345 if !volatile.is_candidate() {
347 return false;
348 }
349
350 if resp.term > persistent.current_term {
352 persistent.update_term(resp.term);
353 volatile.become_follower(None);
354 *self.candidate_state.write() = None;
355 return false;
356 }
357
358 if resp.term < persistent.current_term {
360 return false;
361 }
362
363 if resp.vote_granted {
365 let mut candidate_state_guard = self.candidate_state.write();
366 if let Some(candidate_state) = candidate_state_guard.as_mut() {
367 candidate_state.record_vote(from);
368
369 info!(
370 node_id = self.node_id(),
371 from = from,
372 votes = candidate_state.vote_count(),
373 quorum = self.config.quorum_size(),
374 "Received vote"
375 );
376
377 candidate_state.has_quorum(self.config.quorum_size())
379 } else {
380 false
381 }
382 } else {
383 false
384 }
385 };
386
387 if should_become_leader {
389 self.become_leader();
390 return true;
391 }
392
393 false
394 }
395
396 fn become_leader(&self) {
398 let mut volatile = self.volatile.write();
399 volatile.become_leader();
400
401 let log = self.log.read();
402 let last_log_index = log.last_index();
403
404 *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
406 *self.candidate_state.write() = None;
407
408 info!(
409 node_id = self.node_id(),
410 term = self.current_term(),
411 "Became leader"
412 );
413 }
414
415 pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
417 let volatile = self.volatile.read();
418 if !volatile.is_leader() {
419 return Vec::new();
420 }
421 drop(volatile);
422
423 let term = self.current_term();
424 let log = self.log.read();
425 let leader_commit = log.commit_index();
426
427 self.config
428 .peers
429 .iter()
430 .filter(|&&peer| peer != self.node_id())
431 .map(|&peer| {
432 let prev_log_index = log.last_index();
433 let prev_log_term = log.last_term();
434
435 let req = AppendEntriesRequest::heartbeat(
436 term,
437 self.node_id(),
438 prev_log_index,
439 prev_log_term,
440 leader_commit,
441 );
442
443 (peer, req)
444 })
445 .collect()
446 }
447
448 pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
450 let volatile = self.volatile.read();
451 if !volatile.is_leader() {
452 return Vec::new();
453 }
454 drop(volatile);
455
456 let leader_state_guard = self.leader_state.read();
457 let leader_state = match leader_state_guard.as_ref() {
458 Some(state) => state,
459 None => return Vec::new(),
460 };
461
462 let term = self.current_term();
463 let log = self.log.read();
464 let leader_commit = log.commit_index();
465
466 self.config
467 .peers
468 .iter()
469 .filter(|&&peer| peer != self.node_id())
470 .filter_map(|&peer| {
471 let next_index = leader_state.get_next_index(peer);
472
473 if next_index > log.last_index() {
474 return None;
475 }
476
477 let prev_log_index = if next_index > 1 { next_index - 1 } else { 0 };
478 let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
479
480 let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
481
482 if entries.is_empty() {
483 return None;
484 }
485
486 let req = AppendEntriesRequest::new(
487 term,
488 self.node_id(),
489 prev_log_index,
490 prev_log_term,
491 entries,
492 leader_commit,
493 );
494
495 Some((peer, req))
496 })
497 .collect()
498 }
499
500 pub fn handle_replication_response(
502 &self,
503 from: NodeId,
504 resp: AppendEntriesResponse,
505 ) -> RaftResult<()> {
506 let mut persistent = self.persistent.write();
507 let mut volatile = self.volatile.write();
508
509 if !volatile.is_leader() {
511 return Ok(());
512 }
513
514 if resp.term > persistent.current_term {
516 persistent.update_term(resp.term);
517 volatile.become_follower(None);
518 *self.leader_state.write() = None;
519 return Ok(());
520 }
521
522 drop(persistent);
523 drop(volatile);
524
525 let mut leader_state_guard = self.leader_state.write();
526 let leader_state = match leader_state_guard.as_mut() {
527 Some(state) => state,
528 None => return Ok(()),
529 };
530
531 if resp.success {
532 leader_state.update_success(from, resp.last_log_index);
534
535 debug!(
536 node_id = self.node_id(),
537 peer = from,
538 match_index = resp.last_log_index,
539 "Replication successful"
540 );
541
542 let new_commit = leader_state
544 .calculate_commit_index(self.log.read().last_index(), self.config.quorum_size());
545
546 let mut log = self.log.write();
547 if new_commit > log.commit_index() {
548 if let Some(term) = log.get_term(new_commit) {
550 if term == self.current_term() {
551 log.set_commit_index(new_commit)?;
552 info!(
553 node_id = self.node_id(),
554 commit_index = new_commit,
555 "Advanced commit index"
556 );
557 }
558 }
559 }
560 } else {
561 leader_state.update_failure(from);
563
564 warn!(
565 node_id = self.node_id(),
566 peer = from,
567 next_index = leader_state.get_next_index(from),
568 "Replication failed, will retry"
569 );
570 }
571
572 Ok(())
573 }
574
575 pub fn election_timeout_elapsed(&self) -> bool {
577 let last_heartbeat = *self.last_heartbeat.read();
578 let timeout = self.config.random_election_timeout();
579 last_heartbeat.elapsed() >= timeout
580 }
581
582 pub fn reset_election_timer(&self) {
584 *self.last_heartbeat.write() = Instant::now();
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591
592 fn create_test_node(node_id: NodeId) -> RaftNode {
593 let config = RaftConfig::new(node_id, vec![1, 2, 3]);
594 RaftNode::new(config).expect("Failed to create node")
595 }
596
597 #[test]
598 fn test_new_node() {
599 let node = create_test_node(1);
600 assert_eq!(node.node_id(), 1);
601 assert_eq!(node.current_term(), 0);
602 assert_eq!(node.state(), NodeState::Follower);
603 assert_eq!(node.leader_id(), None);
604 }
605
606 #[test]
607 fn test_start_election() {
608 let node = create_test_node(1);
609 let requests = node.start_election();
610
611 assert_eq!(node.state(), NodeState::Candidate);
612 assert_eq!(node.current_term(), 1);
613 assert_eq!(requests.len(), 2); }
615
616 #[test]
617 fn test_handle_vote_granted() {
618 let node = create_test_node(1);
619 node.start_election();
620
621 let resp = RequestVoteResponse::granted(1);
625 let became_leader = node.handle_vote_response(2, resp);
626 assert!(became_leader);
627 assert_eq!(node.state(), NodeState::Leader);
628 }
629
630 #[test]
631 fn test_propose_as_follower() {
632 let node = create_test_node(1);
633 let result = node.propose(Command::from_str("test"));
634 assert!(result.is_err());
635 }
636
637 #[test]
638 fn test_propose_as_leader() {
639 let node = create_test_node(1);
640 node.start_election();
641
642 let resp = RequestVoteResponse::granted(1);
644 node.handle_vote_response(2, resp);
645
646 let result = node.propose(Command::from_str("test"));
648 assert!(result.is_ok());
649 }
650}