amaters_cluster/
node.rs

1//! Main Raft node implementation
2
3use crate::error::{RaftError, RaftResult};
4use crate::log::{Command, RaftLog};
5use crate::rpc::{
6    AppendEntriesRequest, AppendEntriesResponse, RequestVoteRequest, RequestVoteResponse,
7};
8use crate::state::{CandidateState, LeaderState, PersistentState, VolatileState};
9use crate::types::{LogIndex, NodeId, NodeState, RaftConfig, Term};
10use parking_lot::RwLock;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tracing::{debug, info, warn};
14
15/// A Raft consensus node
16pub struct RaftNode {
17    /// Node configuration
18    config: Arc<RaftConfig>,
19    /// Persistent state
20    persistent: Arc<RwLock<PersistentState>>,
21    /// Volatile state
22    volatile: Arc<RwLock<VolatileState>>,
23    /// Raft log
24    log: Arc<RwLock<RaftLog>>,
25    /// Leader-specific state
26    leader_state: Arc<RwLock<Option<LeaderState>>>,
27    /// Candidate-specific state
28    candidate_state: Arc<RwLock<Option<CandidateState>>>,
29    /// Last time we received a message from the leader
30    last_heartbeat: Arc<RwLock<Instant>>,
31}
32
33impl RaftNode {
34    /// Create a new Raft node
35    pub fn new(config: RaftConfig) -> RaftResult<Self> {
36        // Validate configuration
37        config
38            .validate()
39            .map_err(|msg| RaftError::ConfigError { message: msg })?;
40
41        Ok(Self {
42            config: Arc::new(config),
43            persistent: Arc::new(RwLock::new(PersistentState::new())),
44            volatile: Arc::new(RwLock::new(VolatileState::new())),
45            log: Arc::new(RwLock::new(RaftLog::new())),
46            leader_state: Arc::new(RwLock::new(None)),
47            candidate_state: Arc::new(RwLock::new(None)),
48            last_heartbeat: Arc::new(RwLock::new(Instant::now())),
49        })
50    }
51
52    /// Get the current node ID
53    pub fn node_id(&self) -> NodeId {
54        self.config.node_id
55    }
56
57    /// Get the current term
58    pub fn current_term(&self) -> Term {
59        self.persistent.read().current_term
60    }
61
62    /// Get the current state
63    pub fn state(&self) -> NodeState {
64        self.volatile.read().node_state
65    }
66
67    /// Get the current leader ID
68    pub fn leader_id(&self) -> Option<NodeId> {
69        self.volatile.read().leader_id
70    }
71
72    /// Check if this node is the leader
73    pub fn is_leader(&self) -> bool {
74        self.volatile.read().is_leader()
75    }
76
77    /// Get the commit index
78    pub fn commit_index(&self) -> LogIndex {
79        self.log.read().commit_index()
80    }
81
82    /// Get the last log index
83    pub fn last_log_index(&self) -> LogIndex {
84        self.log.read().last_index()
85    }
86
87    /// Append a command to the log (leader only)
88    pub fn propose(&self, command: Command) -> RaftResult<LogIndex> {
89        let volatile = self.volatile.read();
90        if !volatile.is_leader() {
91            return Err(RaftError::NotLeader {
92                leader_id: volatile.leader_id,
93            });
94        }
95        drop(volatile);
96
97        let term = self.current_term();
98        let mut log = self.log.write();
99        let index = log.append(term, command);
100
101        info!(
102            node_id = self.node_id(),
103            index = index,
104            term = term,
105            "Proposed new entry"
106        );
107
108        Ok(index)
109    }
110
111    /// Handle a RequestVote RPC
112    pub fn handle_request_vote(&self, req: RequestVoteRequest) -> RequestVoteResponse {
113        let mut persistent = self.persistent.write();
114        let mut volatile = self.volatile.write();
115
116        debug!(
117            node_id = self.node_id(),
118            candidate = req.candidate_id,
119            term = req.term,
120            "Received RequestVote"
121        );
122
123        // Update term if necessary
124        if req.term > persistent.current_term {
125            persistent.update_term(req.term);
126            volatile.become_follower(None);
127            *self.leader_state.write() = None;
128            *self.candidate_state.write() = None;
129        }
130
131        // Reject if term is stale
132        if req.term < persistent.current_term {
133            warn!(
134                node_id = self.node_id(),
135                candidate = req.candidate_id,
136                current_term = persistent.current_term,
137                request_term = req.term,
138                "Rejecting vote: stale term"
139            );
140            return RequestVoteResponse::rejected(persistent.current_term);
141        }
142
143        // Check if we've already voted
144        if let Some(voted_for) = persistent.voted_for {
145            if voted_for != req.candidate_id {
146                warn!(
147                    node_id = self.node_id(),
148                    candidate = req.candidate_id,
149                    voted_for = voted_for,
150                    "Rejecting vote: already voted"
151                );
152                return RequestVoteResponse::rejected(persistent.current_term);
153            }
154        }
155
156        // Check if candidate's log is at least as up-to-date as ours
157        let log = self.log.read();
158        let our_last_index = log.last_index();
159        let our_last_term = log.last_term();
160
161        let log_ok = req.last_log_term > our_last_term
162            || (req.last_log_term == our_last_term && req.last_log_index >= our_last_index);
163
164        if !log_ok {
165            warn!(
166                node_id = self.node_id(),
167                candidate = req.candidate_id,
168                our_last_index = our_last_index,
169                our_last_term = our_last_term,
170                candidate_last_index = req.last_log_index,
171                candidate_last_term = req.last_log_term,
172                "Rejecting vote: candidate log not up-to-date"
173            );
174            return RequestVoteResponse::rejected(persistent.current_term);
175        }
176
177        // Grant vote
178        persistent.grant_vote(req.candidate_id);
179        *self.last_heartbeat.write() = Instant::now();
180
181        info!(
182            node_id = self.node_id(),
183            candidate = req.candidate_id,
184            term = req.term,
185            "Granted vote"
186        );
187
188        RequestVoteResponse::granted(persistent.current_term)
189    }
190
191    /// Handle an AppendEntries RPC
192    pub fn handle_append_entries(&self, req: AppendEntriesRequest) -> AppendEntriesResponse {
193        let mut persistent = self.persistent.write();
194        let mut volatile = self.volatile.write();
195
196        debug!(
197            node_id = self.node_id(),
198            leader = req.leader_id,
199            term = req.term,
200            entries = req.entries.len(),
201            "Received AppendEntries"
202        );
203
204        // Update term if necessary
205        if req.term > persistent.current_term {
206            persistent.update_term(req.term);
207            volatile.become_follower(Some(req.leader_id));
208            *self.leader_state.write() = None;
209            *self.candidate_state.write() = None;
210        }
211
212        // Reject if term is stale
213        if req.term < persistent.current_term {
214            warn!(
215                node_id = self.node_id(),
216                leader = req.leader_id,
217                current_term = persistent.current_term,
218                request_term = req.term,
219                "Rejecting AppendEntries: stale term"
220            );
221            return AppendEntriesResponse::rejected(persistent.current_term);
222        }
223
224        // Update heartbeat and leader
225        *self.last_heartbeat.write() = Instant::now();
226        volatile.become_follower(Some(req.leader_id));
227        *self.candidate_state.write() = None;
228
229        drop(persistent);
230        drop(volatile);
231
232        // Handle the entries
233        let mut log = self.log.write();
234        let our_last_index = log.last_index();
235
236        // Check if we have the previous log entry
237        if req.prev_log_index > 0 && !log.matches(req.prev_log_index, req.prev_log_term) {
238            // Find conflict index and term
239            let conflict_index = req.prev_log_index.min(our_last_index);
240            let conflict_term = log.get_term(conflict_index).unwrap_or(0);
241
242            warn!(
243                node_id = self.node_id(),
244                prev_log_index = req.prev_log_index,
245                prev_log_term = req.prev_log_term,
246                conflict_index = conflict_index,
247                conflict_term = conflict_term,
248                "Rejecting AppendEntries: log inconsistency"
249            );
250
251            return AppendEntriesResponse::failure(
252                self.current_term(),
253                our_last_index,
254                conflict_index,
255                conflict_term,
256            );
257        }
258
259        // Append entries if any
260        if !req.entries.is_empty() {
261            // Delete conflicting entries
262            let first_new_index = req.entries[0].index;
263            if first_new_index <= our_last_index {
264                if let Err(e) = log.truncate_from(first_new_index) {
265                    warn!(
266                        node_id = self.node_id(),
267                        error = ?e,
268                        "Failed to truncate log"
269                    );
270                    return AppendEntriesResponse::rejected(self.current_term());
271                }
272            }
273
274            // Append new entries
275            if let Err(e) = log.append_entries(req.entries) {
276                warn!(
277                    node_id = self.node_id(),
278                    error = ?e,
279                    "Failed to append entries"
280                );
281                return AppendEntriesResponse::rejected(self.current_term());
282            }
283        }
284
285        // Update commit index
286        if req.leader_commit > log.commit_index() {
287            let new_commit = req.leader_commit.min(log.last_index());
288            if let Err(e) = log.set_commit_index(new_commit) {
289                warn!(
290                    node_id = self.node_id(),
291                    error = ?e,
292                    "Failed to update commit index"
293                );
294            } else {
295                debug!(
296                    node_id = self.node_id(),
297                    commit_index = new_commit,
298                    "Updated commit index"
299                );
300            }
301        }
302
303        AppendEntriesResponse::success(self.current_term(), log.last_index())
304    }
305
306    /// Start an election (transition to candidate)
307    pub fn start_election(&self) -> Vec<RequestVoteRequest> {
308        let mut persistent = self.persistent.write();
309        let mut volatile = self.volatile.write();
310
311        // Increment term and vote for self
312        persistent.current_term += 1;
313        persistent.grant_vote(self.node_id());
314
315        // Transition to candidate
316        volatile.become_candidate();
317
318        // Initialize candidate state
319        *self.candidate_state.write() = Some(CandidateState::new(self.node_id()));
320
321        let term = persistent.current_term;
322        let log = self.log.read();
323        let last_log_index = log.last_index();
324        let last_log_term = log.last_term();
325
326        info!(node_id = self.node_id(), term = term, "Started election");
327
328        // Send RequestVote to all peers
329        self.config
330            .peers
331            .iter()
332            .filter(|&&peer| peer != self.node_id())
333            .map(|&peer| {
334                RequestVoteRequest::new(term, self.node_id(), last_log_index, last_log_term)
335            })
336            .collect()
337    }
338
339    /// Handle a vote response during election
340    pub fn handle_vote_response(&self, from: NodeId, resp: RequestVoteResponse) -> bool {
341        let should_become_leader = {
342            let mut persistent = self.persistent.write();
343            let mut volatile = self.volatile.write();
344
345            // Check if we're still a candidate
346            if !volatile.is_candidate() {
347                return false;
348            }
349
350            // Update term if necessary
351            if resp.term > persistent.current_term {
352                persistent.update_term(resp.term);
353                volatile.become_follower(None);
354                *self.candidate_state.write() = None;
355                return false;
356            }
357
358            // Ignore stale responses
359            if resp.term < persistent.current_term {
360                return false;
361            }
362
363            // Record vote if granted
364            if resp.vote_granted {
365                let mut candidate_state_guard = self.candidate_state.write();
366                if let Some(candidate_state) = candidate_state_guard.as_mut() {
367                    candidate_state.record_vote(from);
368
369                    info!(
370                        node_id = self.node_id(),
371                        from = from,
372                        votes = candidate_state.vote_count(),
373                        quorum = self.config.quorum_size(),
374                        "Received vote"
375                    );
376
377                    // Check if we have a quorum
378                    candidate_state.has_quorum(self.config.quorum_size())
379                } else {
380                    false
381                }
382            } else {
383                false
384            }
385        };
386
387        // Become leader outside of locks to prevent deadlock
388        if should_become_leader {
389            self.become_leader();
390            return true;
391        }
392
393        false
394    }
395
396    /// Transition to leader
397    fn become_leader(&self) {
398        let mut volatile = self.volatile.write();
399        volatile.become_leader();
400
401        let log = self.log.read();
402        let last_log_index = log.last_index();
403
404        // Initialize leader state
405        *self.leader_state.write() = Some(LeaderState::new(&self.config.peers, last_log_index));
406        *self.candidate_state.write() = None;
407
408        info!(
409            node_id = self.node_id(),
410            term = self.current_term(),
411            "Became leader"
412        );
413    }
414
415    /// Create heartbeat messages for all peers
416    pub fn create_heartbeats(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
417        let volatile = self.volatile.read();
418        if !volatile.is_leader() {
419            return Vec::new();
420        }
421        drop(volatile);
422
423        let term = self.current_term();
424        let log = self.log.read();
425        let leader_commit = log.commit_index();
426
427        self.config
428            .peers
429            .iter()
430            .filter(|&&peer| peer != self.node_id())
431            .map(|&peer| {
432                let prev_log_index = log.last_index();
433                let prev_log_term = log.last_term();
434
435                let req = AppendEntriesRequest::heartbeat(
436                    term,
437                    self.node_id(),
438                    prev_log_index,
439                    prev_log_term,
440                    leader_commit,
441                );
442
443                (peer, req)
444            })
445            .collect()
446    }
447
448    /// Create replication messages for all peers
449    pub fn create_replication_requests(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
450        let volatile = self.volatile.read();
451        if !volatile.is_leader() {
452            return Vec::new();
453        }
454        drop(volatile);
455
456        let leader_state_guard = self.leader_state.read();
457        let leader_state = match leader_state_guard.as_ref() {
458            Some(state) => state,
459            None => return Vec::new(),
460        };
461
462        let term = self.current_term();
463        let log = self.log.read();
464        let leader_commit = log.commit_index();
465
466        self.config
467            .peers
468            .iter()
469            .filter(|&&peer| peer != self.node_id())
470            .filter_map(|&peer| {
471                let next_index = leader_state.get_next_index(peer);
472
473                if next_index > log.last_index() {
474                    return None;
475                }
476
477                let prev_log_index = if next_index > 1 { next_index - 1 } else { 0 };
478                let prev_log_term = log.get_term(prev_log_index).unwrap_or(0);
479
480                let entries = log.get_entries_from(next_index, self.config.max_entries_per_message);
481
482                if entries.is_empty() {
483                    return None;
484                }
485
486                let req = AppendEntriesRequest::new(
487                    term,
488                    self.node_id(),
489                    prev_log_index,
490                    prev_log_term,
491                    entries,
492                    leader_commit,
493                );
494
495                Some((peer, req))
496            })
497            .collect()
498    }
499
500    /// Handle a replication response
501    pub fn handle_replication_response(
502        &self,
503        from: NodeId,
504        resp: AppendEntriesResponse,
505    ) -> RaftResult<()> {
506        let mut persistent = self.persistent.write();
507        let mut volatile = self.volatile.write();
508
509        // Check if we're still the leader
510        if !volatile.is_leader() {
511            return Ok(());
512        }
513
514        // Update term if necessary
515        if resp.term > persistent.current_term {
516            persistent.update_term(resp.term);
517            volatile.become_follower(None);
518            *self.leader_state.write() = None;
519            return Ok(());
520        }
521
522        drop(persistent);
523        drop(volatile);
524
525        let mut leader_state_guard = self.leader_state.write();
526        let leader_state = match leader_state_guard.as_mut() {
527            Some(state) => state,
528            None => return Ok(()),
529        };
530
531        if resp.success {
532            // Update match_index and next_index
533            leader_state.update_success(from, resp.last_log_index);
534
535            debug!(
536                node_id = self.node_id(),
537                peer = from,
538                match_index = resp.last_log_index,
539                "Replication successful"
540            );
541
542            // Try to advance commit index
543            let new_commit = leader_state
544                .calculate_commit_index(self.log.read().last_index(), self.config.quorum_size());
545
546            let mut log = self.log.write();
547            if new_commit > log.commit_index() {
548                // Verify that the entry at new_commit has the current term
549                if let Some(term) = log.get_term(new_commit) {
550                    if term == self.current_term() {
551                        log.set_commit_index(new_commit)?;
552                        info!(
553                            node_id = self.node_id(),
554                            commit_index = new_commit,
555                            "Advanced commit index"
556                        );
557                    }
558                }
559            }
560        } else {
561            // Replication failed, decrement next_index
562            leader_state.update_failure(from);
563
564            warn!(
565                node_id = self.node_id(),
566                peer = from,
567                next_index = leader_state.get_next_index(from),
568                "Replication failed, will retry"
569            );
570        }
571
572        Ok(())
573    }
574
575    /// Check if election timeout has elapsed
576    pub fn election_timeout_elapsed(&self) -> bool {
577        let last_heartbeat = *self.last_heartbeat.read();
578        let timeout = self.config.random_election_timeout();
579        last_heartbeat.elapsed() >= timeout
580    }
581
582    /// Reset election timer
583    pub fn reset_election_timer(&self) {
584        *self.last_heartbeat.write() = Instant::now();
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591
592    fn create_test_node(node_id: NodeId) -> RaftNode {
593        let config = RaftConfig::new(node_id, vec![1, 2, 3]);
594        RaftNode::new(config).expect("Failed to create node")
595    }
596
597    #[test]
598    fn test_new_node() {
599        let node = create_test_node(1);
600        assert_eq!(node.node_id(), 1);
601        assert_eq!(node.current_term(), 0);
602        assert_eq!(node.state(), NodeState::Follower);
603        assert_eq!(node.leader_id(), None);
604    }
605
606    #[test]
607    fn test_start_election() {
608        let node = create_test_node(1);
609        let requests = node.start_election();
610
611        assert_eq!(node.state(), NodeState::Candidate);
612        assert_eq!(node.current_term(), 1);
613        assert_eq!(requests.len(), 2); // 3 peers - self
614    }
615
616    #[test]
617    fn test_handle_vote_granted() {
618        let node = create_test_node(1);
619        node.start_election();
620
621        // With 3 nodes, quorum is 2 (self + 1 vote)
622        // After start_election, node has 1 vote (self)
623        // After first granted vote, node has 2 votes = quorum
624        let resp = RequestVoteResponse::granted(1);
625        let became_leader = node.handle_vote_response(2, resp);
626        assert!(became_leader);
627        assert_eq!(node.state(), NodeState::Leader);
628    }
629
630    #[test]
631    fn test_propose_as_follower() {
632        let node = create_test_node(1);
633        let result = node.propose(Command::from_str("test"));
634        assert!(result.is_err());
635    }
636
637    #[test]
638    fn test_propose_as_leader() {
639        let node = create_test_node(1);
640        node.start_election();
641
642        // Become leader
643        let resp = RequestVoteResponse::granted(1);
644        node.handle_vote_response(2, resp);
645
646        // Now we can propose
647        let result = node.propose(Command::from_str("test"));
648        assert!(result.is_ok());
649    }
650}