amaters_cluster/
state.rs

1//! Raft persistent and volatile state
2
3use crate::types::{LogIndex, NodeId, NodeState, Term};
4use std::collections::HashMap;
5
6/// Persistent state on all servers (must be persisted before responding to RPCs)
7#[derive(Debug, Clone)]
8pub struct PersistentState {
9    /// Latest term server has seen (initialized to 0, increases monotonically)
10    pub current_term: Term,
11    /// Candidate ID that received vote in current term (None if none)
12    pub voted_for: Option<NodeId>,
13}
14
15impl PersistentState {
16    /// Create a new persistent state
17    pub fn new() -> Self {
18        Self {
19            current_term: 0,
20            voted_for: None,
21        }
22    }
23
24    /// Update the current term (clears voted_for if term increases)
25    pub fn update_term(&mut self, new_term: Term) {
26        if new_term > self.current_term {
27            self.current_term = new_term;
28            self.voted_for = None;
29        }
30    }
31
32    /// Grant a vote to a candidate
33    pub fn grant_vote(&mut self, candidate_id: NodeId) {
34        self.voted_for = Some(candidate_id);
35    }
36}
37
38impl Default for PersistentState {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44/// Volatile state on all servers
45#[derive(Debug, Clone)]
46pub struct VolatileState {
47    /// Current node state
48    pub node_state: NodeState,
49    /// Current known leader ID (None if unknown)
50    pub leader_id: Option<NodeId>,
51}
52
53impl VolatileState {
54    /// Create a new volatile state
55    pub fn new() -> Self {
56        Self {
57            node_state: NodeState::Follower,
58            leader_id: None,
59        }
60    }
61
62    /// Transition to follower state
63    pub fn become_follower(&mut self, leader_id: Option<NodeId>) {
64        self.node_state = NodeState::Follower;
65        self.leader_id = leader_id;
66    }
67
68    /// Transition to candidate state
69    pub fn become_candidate(&mut self) {
70        self.node_state = NodeState::Candidate;
71        self.leader_id = None;
72    }
73
74    /// Transition to leader state
75    pub fn become_leader(&mut self) {
76        self.node_state = NodeState::Leader;
77        self.leader_id = None;
78    }
79
80    /// Check if this node is the leader
81    pub fn is_leader(&self) -> bool {
82        self.node_state == NodeState::Leader
83    }
84
85    /// Check if this node is a candidate
86    pub fn is_candidate(&self) -> bool {
87        self.node_state == NodeState::Candidate
88    }
89
90    /// Check if this node is a follower
91    pub fn is_follower(&self) -> bool {
92        self.node_state == NodeState::Follower
93    }
94}
95
96impl Default for VolatileState {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102/// Volatile state on leaders (reinitialized after election)
103#[derive(Debug, Clone)]
104pub struct LeaderState {
105    /// For each server, index of the next log entry to send to that server
106    pub next_index: HashMap<NodeId, LogIndex>,
107    /// For each server, index of highest log entry known to be replicated on server
108    pub match_index: HashMap<NodeId, LogIndex>,
109}
110
111impl LeaderState {
112    /// Create a new leader state
113    pub fn new(peers: &[NodeId], last_log_index: LogIndex) -> Self {
114        let mut next_index = HashMap::new();
115        let mut match_index = HashMap::new();
116
117        for &peer in peers {
118            next_index.insert(peer, last_log_index + 1);
119            match_index.insert(peer, 0);
120        }
121
122        Self {
123            next_index,
124            match_index,
125        }
126    }
127
128    /// Update next_index for a peer after successful replication
129    pub fn update_success(&mut self, peer: NodeId, match_idx: LogIndex) {
130        self.match_index.insert(peer, match_idx);
131        self.next_index.insert(peer, match_idx + 1);
132    }
133
134    /// Update next_index for a peer after failed replication
135    pub fn update_failure(&mut self, peer: NodeId) {
136        if let Some(next_idx) = self.next_index.get_mut(&peer) {
137            if *next_idx > 1 {
138                *next_idx -= 1;
139            }
140        }
141    }
142
143    /// Get next_index for a peer
144    pub fn get_next_index(&self, peer: NodeId) -> LogIndex {
145        self.next_index.get(&peer).copied().unwrap_or(1)
146    }
147
148    /// Get match_index for a peer
149    pub fn get_match_index(&self, peer: NodeId) -> LogIndex {
150        self.match_index.get(&peer).copied().unwrap_or(0)
151    }
152
153    /// Calculate the commit index based on match_index values
154    /// Returns the highest index that is replicated on a majority of servers
155    pub fn calculate_commit_index(&self, current_index: LogIndex, quorum_size: usize) -> LogIndex {
156        // Collect all match indices
157        let mut indices: Vec<LogIndex> = self.match_index.values().copied().collect();
158        indices.sort_unstable();
159        indices.reverse();
160
161        // Find the index at the quorum position
162        // quorum_size includes the leader, so we need quorum_size - 1 followers
163        if indices.len() + 1 >= quorum_size {
164            let quorum_idx = quorum_size.saturating_sub(2);
165            if quorum_idx < indices.len() {
166                return indices[quorum_idx].min(current_index);
167            }
168        }
169
170        0
171    }
172}
173
174/// Volatile state for candidates (during election)
175#[derive(Debug, Clone)]
176pub struct CandidateState {
177    /// Votes received from peers (including self)
178    pub votes_received: Vec<NodeId>,
179}
180
181impl CandidateState {
182    /// Create a new candidate state
183    pub fn new(self_id: NodeId) -> Self {
184        Self {
185            votes_received: vec![self_id],
186        }
187    }
188
189    /// Record a vote from a peer
190    pub fn record_vote(&mut self, peer: NodeId) {
191        if !self.votes_received.contains(&peer) {
192            self.votes_received.push(peer);
193        }
194    }
195
196    /// Check if we have a quorum of votes
197    pub fn has_quorum(&self, quorum_size: usize) -> bool {
198        self.votes_received.len() >= quorum_size
199    }
200
201    /// Get the number of votes received
202    pub fn vote_count(&self) -> usize {
203        self.votes_received.len()
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_persistent_state_new() {
213        let state = PersistentState::new();
214        assert_eq!(state.current_term, 0);
215        assert_eq!(state.voted_for, None);
216    }
217
218    #[test]
219    fn test_persistent_state_update_term() {
220        let mut state = PersistentState::new();
221        state.voted_for = Some(1);
222        state.update_term(5);
223
224        assert_eq!(state.current_term, 5);
225        assert_eq!(state.voted_for, None);
226    }
227
228    #[test]
229    fn test_persistent_state_grant_vote() {
230        let mut state = PersistentState::new();
231        state.grant_vote(2);
232        assert_eq!(state.voted_for, Some(2));
233    }
234
235    #[test]
236    fn test_volatile_state_new() {
237        let state = VolatileState::new();
238        assert_eq!(state.node_state, NodeState::Follower);
239        assert_eq!(state.leader_id, None);
240    }
241
242    #[test]
243    fn test_volatile_state_transitions() {
244        let mut state = VolatileState::new();
245
246        state.become_candidate();
247        assert!(state.is_candidate());
248        assert_eq!(state.leader_id, None);
249
250        state.become_leader();
251        assert!(state.is_leader());
252        assert_eq!(state.leader_id, None);
253
254        state.become_follower(Some(5));
255        assert!(state.is_follower());
256        assert_eq!(state.leader_id, Some(5));
257    }
258
259    #[test]
260    fn test_leader_state_new() {
261        let peers = vec![1, 2, 3];
262        let leader_state = LeaderState::new(&peers, 10);
263
264        assert_eq!(leader_state.get_next_index(1), 11);
265        assert_eq!(leader_state.get_match_index(1), 0);
266    }
267
268    #[test]
269    fn test_leader_state_update_success() {
270        let peers = vec![1, 2, 3];
271        let mut leader_state = LeaderState::new(&peers, 10);
272
273        leader_state.update_success(1, 12);
274        assert_eq!(leader_state.get_next_index(1), 13);
275        assert_eq!(leader_state.get_match_index(1), 12);
276    }
277
278    #[test]
279    fn test_leader_state_update_failure() {
280        let peers = vec![1, 2, 3];
281        let mut leader_state = LeaderState::new(&peers, 10);
282
283        leader_state.update_failure(1);
284        assert_eq!(leader_state.get_next_index(1), 10);
285    }
286
287    #[test]
288    fn test_leader_state_calculate_commit_index() {
289        let peers = vec![2, 3, 4, 5];
290        let mut leader_state = LeaderState::new(&peers, 10);
291
292        // With 5 nodes total, quorum is 3
293        leader_state.update_success(2, 8);
294        leader_state.update_success(3, 9);
295        leader_state.update_success(4, 7);
296        leader_state.update_success(5, 6);
297
298        // Sorted match indices: [9, 8, 7, 6]
299        // At position 1 (quorum_size - 2 = 3 - 2 = 1): index 8
300        let commit_idx = leader_state.calculate_commit_index(10, 3);
301        assert_eq!(commit_idx, 8);
302    }
303
304    #[test]
305    fn test_candidate_state_new() {
306        let state = CandidateState::new(1);
307        assert_eq!(state.vote_count(), 1);
308        assert!(state.votes_received.contains(&1));
309    }
310
311    #[test]
312    fn test_candidate_state_record_vote() {
313        let mut state = CandidateState::new(1);
314        state.record_vote(2);
315        state.record_vote(3);
316
317        assert_eq!(state.vote_count(), 3);
318        assert!(state.has_quorum(2));
319    }
320
321    #[test]
322    fn test_candidate_state_has_quorum() {
323        let mut state = CandidateState::new(1);
324        assert!(state.has_quorum(1));
325        assert!(!state.has_quorum(2));
326
327        state.record_vote(2);
328        assert!(state.has_quorum(2));
329    }
330}