Skip to main content

amaters_cluster/
state.rs

1//! Raft persistent and volatile state
2
3use crate::types::{FencingToken, LogIndex, NodeId, NodeState, Term};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tracing::debug;
7
8/// Persistent state on all servers (must be persisted before responding to RPCs)
9#[derive(Debug, Clone)]
10pub struct PersistentState {
11    /// Latest term server has seen (initialized to 0, increases monotonically)
12    pub current_term: Term,
13    /// Candidate ID that received vote in current term (None if none)
14    pub voted_for: Option<NodeId>,
15}
16
17impl PersistentState {
18    /// Create a new persistent state
19    pub fn new() -> Self {
20        Self {
21            current_term: 0,
22            voted_for: None,
23        }
24    }
25
26    /// Update the current term (clears voted_for if term increases)
27    pub fn update_term(&mut self, new_term: Term) {
28        if new_term > self.current_term {
29            debug!(
30                old_term = self.current_term,
31                new_term = new_term,
32                "Persistent state: term updated, cleared voted_for"
33            );
34            self.current_term = new_term;
35            self.voted_for = None;
36        }
37    }
38
39    /// Grant a vote to a candidate
40    pub fn grant_vote(&mut self, candidate_id: NodeId) {
41        debug!(
42            candidate_id = candidate_id,
43            term = self.current_term,
44            "Persistent state: vote granted"
45        );
46        self.voted_for = Some(candidate_id);
47    }
48}
49
50impl Default for PersistentState {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56/// Volatile fencing-token state shared across the cluster node.
57///
58/// Stores the current packed fencing token as an `AtomicU64` so that concurrent
59/// readers (e.g. storage guards) can check staleness without taking a lock.
60/// The high 32 bits encode the Raft term; the low 32 bits encode the monotonic
61/// write sequence within that term.
62pub struct FencingTokenState {
63    current_token: AtomicU64,
64}
65
66impl FencingTokenState {
67    /// Create a new state with token = 0 (no leader epoch yet).
68    pub fn new() -> Self {
69        Self {
70            current_token: AtomicU64::new(0),
71        }
72    }
73
74    /// Atomically issue the next fencing token by incrementing the sequence.
75    ///
76    /// Intended to be called on every write from the current leader.
77    pub fn issue_token(&self) -> FencingToken {
78        let raw = self.current_token.fetch_add(1, Ordering::SeqCst);
79        FencingToken(raw)
80    }
81
82    /// Bump the token to a new leader term, resetting the sequence to zero.
83    ///
84    /// This must be called atomically when a node wins an election.
85    pub fn bump_term_token(&self, new_term: u32) {
86        let token = FencingToken::new_leader_term(new_term);
87        self.current_token.store(token.raw(), Ordering::SeqCst);
88    }
89
90    /// Read the current raw token value (for serialisation / inspection).
91    pub fn current_raw(&self) -> u64 {
92        self.current_token.load(Ordering::SeqCst)
93    }
94}
95
96impl Default for FencingTokenState {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102impl std::fmt::Debug for FencingTokenState {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        let raw = self.current_token.load(Ordering::Relaxed);
105        let token = FencingToken(raw);
106        f.debug_struct("FencingTokenState")
107            .field("term", &token.term())
108            .field("seq", &token.seq())
109            .finish()
110    }
111}
112
113/// Volatile state on all servers
114#[derive(Debug, Clone)]
115pub struct VolatileState {
116    /// Current node state
117    pub node_state: NodeState,
118    /// Current known leader ID (None if unknown)
119    pub leader_id: Option<NodeId>,
120}
121
122impl VolatileState {
123    /// Create a new volatile state
124    pub fn new() -> Self {
125        Self {
126            node_state: NodeState::Follower,
127            leader_id: None,
128        }
129    }
130
131    /// Transition to follower state
132    pub fn become_follower(&mut self, leader_id: Option<NodeId>) {
133        self.node_state = NodeState::Follower;
134        self.leader_id = leader_id;
135    }
136
137    /// Transition to candidate state
138    pub fn become_candidate(&mut self) {
139        self.node_state = NodeState::Candidate;
140        self.leader_id = None;
141    }
142
143    /// Transition to leader state
144    pub fn become_leader(&mut self) {
145        self.node_state = NodeState::Leader;
146        self.leader_id = None;
147    }
148
149    /// Check if this node is the leader
150    pub fn is_leader(&self) -> bool {
151        self.node_state == NodeState::Leader
152    }
153
154    /// Check if this node is a candidate
155    pub fn is_candidate(&self) -> bool {
156        self.node_state == NodeState::Candidate
157    }
158
159    /// Check if this node is a follower
160    pub fn is_follower(&self) -> bool {
161        self.node_state == NodeState::Follower
162    }
163}
164
165impl Default for VolatileState {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171/// Volatile state on leaders (reinitialized after election)
172#[derive(Debug, Clone)]
173pub struct LeaderState {
174    /// For each server, index of the next log entry to send to that server
175    pub next_index: HashMap<NodeId, LogIndex>,
176    /// For each server, index of highest log entry known to be replicated on server
177    pub match_index: HashMap<NodeId, LogIndex>,
178}
179
180impl LeaderState {
181    /// Create a new leader state
182    pub fn new(peers: &[NodeId], last_log_index: LogIndex) -> Self {
183        let mut next_index = HashMap::new();
184        let mut match_index = HashMap::new();
185
186        for &peer in peers {
187            next_index.insert(peer, last_log_index + 1);
188            match_index.insert(peer, 0);
189        }
190
191        Self {
192            next_index,
193            match_index,
194        }
195    }
196
197    /// Update next_index for a peer after successful replication
198    pub fn update_success(&mut self, peer: NodeId, match_idx: LogIndex) {
199        self.match_index.insert(peer, match_idx);
200        self.next_index.insert(peer, match_idx + 1);
201    }
202
203    /// Update next_index for a peer after failed replication
204    pub fn update_failure(&mut self, peer: NodeId) {
205        if let Some(next_idx) = self.next_index.get_mut(&peer) {
206            if *next_idx > 1 {
207                *next_idx -= 1;
208            }
209        }
210    }
211
212    /// Update next_index for a peer after failed replication using conflict hints.
213    ///
214    /// This implements the "fast backup" optimization from the Raft paper:
215    /// instead of decrementing next_index one at a time, we jump back to the
216    /// conflict point reported by the follower.
217    ///
218    /// - `conflict_index`: the first index of the conflicting term on the follower
219    /// - `conflict_term`: the term of the conflicting entry
220    /// - `follower_last_index`: the follower's last log index
221    ///
222    /// If the leader has entries with `conflict_term`, it sets `next_index` to
223    /// the index after its last entry of that term. Otherwise, it sets
224    /// `next_index` to `conflict_index`.
225    pub fn update_failure_with_hint(
226        &mut self,
227        peer: NodeId,
228        conflict_index: Option<LogIndex>,
229        _conflict_term: Option<Term>,
230        follower_last_index: LogIndex,
231    ) {
232        let new_next = match conflict_index {
233            Some(ci) if ci > 0 => {
234                // Jump back to the conflict index
235                ci
236            }
237            _ => {
238                // No conflict hint; fall back to follower's last index + 1
239                // (but at least 1)
240                (follower_last_index + 1).max(1)
241            }
242        };
243
244        // Ensure we never go backwards past 1
245        let clamped = new_next.max(1);
246
247        // Only update if this actually moves next_index backwards (or stays)
248        if let Some(next_idx) = self.next_index.get_mut(&peer) {
249            if clamped < *next_idx {
250                *next_idx = clamped;
251            } else {
252                // Fall back to simple decrement if hint doesn't help
253                if *next_idx > 1 {
254                    *next_idx -= 1;
255                }
256            }
257        } else {
258            self.next_index.insert(peer, clamped);
259        }
260    }
261
262    /// Calculate the commit index considering joint consensus.
263    ///
264    /// During joint consensus, an entry must be replicated to a majority of
265    /// **both** the old and new configurations. The leader itself counts toward
266    /// both configs.
267    pub fn calculate_commit_index_joint(
268        &self,
269        leader_id: NodeId,
270        current_last_index: LogIndex,
271        config_state: &crate::types::ConfigState,
272    ) -> LogIndex {
273        match config_state {
274            crate::types::ConfigState::Stable(config) => {
275                let quorum = config.quorum_size();
276                self.calculate_commit_index(current_last_index, quorum)
277            }
278            crate::types::ConfigState::Joint { old, new } => {
279                // For each config, count how many members have replicated
280                // each index. The leader is always up-to-date.
281                let old_commit = Self::quorum_index_for_config(
282                    old,
283                    leader_id,
284                    current_last_index,
285                    &self.match_index,
286                );
287                let new_commit = Self::quorum_index_for_config(
288                    new,
289                    leader_id,
290                    current_last_index,
291                    &self.match_index,
292                );
293
294                // Must be committed in both configs
295                old_commit.min(new_commit)
296            }
297        }
298    }
299
300    /// Find the highest index replicated to a majority of a single config.
301    fn quorum_index_for_config(
302        config: &crate::types::ClusterConfig,
303        leader_id: NodeId,
304        leader_last_index: LogIndex,
305        match_index: &HashMap<NodeId, LogIndex>,
306    ) -> LogIndex {
307        let member_ids = config.member_ids();
308        let quorum = config.quorum_size();
309
310        // Collect match indices for members of this config
311        let mut indices: Vec<LogIndex> = member_ids
312            .iter()
313            .map(|&id| {
314                if id == leader_id {
315                    leader_last_index
316                } else {
317                    match_index.get(&id).copied().unwrap_or(0)
318                }
319            })
320            .collect();
321
322        indices.sort_unstable();
323        indices.reverse();
324
325        // The index at position (quorum-1) is the highest index
326        // replicated to at least `quorum` members.
327        if indices.len() >= quorum && quorum > 0 {
328            indices[quorum - 1]
329        } else {
330            0
331        }
332    }
333
334    /// Get next_index for a peer
335    pub fn get_next_index(&self, peer: NodeId) -> LogIndex {
336        self.next_index.get(&peer).copied().unwrap_or(1)
337    }
338
339    /// Get match_index for a peer
340    pub fn get_match_index(&self, peer: NodeId) -> LogIndex {
341        self.match_index.get(&peer).copied().unwrap_or(0)
342    }
343
344    /// Calculate the commit index based on match_index values
345    /// Returns the highest index that is replicated on a majority of servers
346    pub fn calculate_commit_index(&self, current_index: LogIndex, quorum_size: usize) -> LogIndex {
347        // Collect all match indices
348        let mut indices: Vec<LogIndex> = self.match_index.values().copied().collect();
349        indices.sort_unstable();
350        indices.reverse();
351
352        // Find the index at the quorum position
353        // quorum_size includes the leader, so we need quorum_size - 1 followers
354        if indices.len() + 1 >= quorum_size {
355            let quorum_idx = quorum_size.saturating_sub(2);
356            if quorum_idx < indices.len() {
357                return indices[quorum_idx].min(current_index);
358            }
359        }
360
361        0
362    }
363}
364
365/// Volatile state for candidates (during election)
366#[derive(Debug, Clone)]
367pub struct CandidateState {
368    /// Votes received from peers (including self)
369    pub votes_received: Vec<NodeId>,
370}
371
372impl CandidateState {
373    /// Create a new candidate state
374    pub fn new(self_id: NodeId) -> Self {
375        Self {
376            votes_received: vec![self_id],
377        }
378    }
379
380    /// Record a vote from a peer
381    pub fn record_vote(&mut self, peer: NodeId) {
382        if !self.votes_received.contains(&peer) {
383            self.votes_received.push(peer);
384        }
385    }
386
387    /// Check if we have a quorum of votes
388    pub fn has_quorum(&self, quorum_size: usize) -> bool {
389        self.votes_received.len() >= quorum_size
390    }
391
392    /// Get the number of votes received
393    pub fn vote_count(&self) -> usize {
394        self.votes_received.len()
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_persistent_state_new() {
404        let state = PersistentState::new();
405        assert_eq!(state.current_term, 0);
406        assert_eq!(state.voted_for, None);
407    }
408
409    #[test]
410    fn test_persistent_state_update_term() {
411        let mut state = PersistentState::new();
412        state.voted_for = Some(1);
413        state.update_term(5);
414
415        assert_eq!(state.current_term, 5);
416        assert_eq!(state.voted_for, None);
417    }
418
419    #[test]
420    fn test_persistent_state_grant_vote() {
421        let mut state = PersistentState::new();
422        state.grant_vote(2);
423        assert_eq!(state.voted_for, Some(2));
424    }
425
426    #[test]
427    fn test_volatile_state_new() {
428        let state = VolatileState::new();
429        assert_eq!(state.node_state, NodeState::Follower);
430        assert_eq!(state.leader_id, None);
431    }
432
433    #[test]
434    fn test_volatile_state_transitions() {
435        let mut state = VolatileState::new();
436
437        state.become_candidate();
438        assert!(state.is_candidate());
439        assert_eq!(state.leader_id, None);
440
441        state.become_leader();
442        assert!(state.is_leader());
443        assert_eq!(state.leader_id, None);
444
445        state.become_follower(Some(5));
446        assert!(state.is_follower());
447        assert_eq!(state.leader_id, Some(5));
448    }
449
450    #[test]
451    fn test_leader_state_new() {
452        let peers = vec![1, 2, 3];
453        let leader_state = LeaderState::new(&peers, 10);
454
455        assert_eq!(leader_state.get_next_index(1), 11);
456        assert_eq!(leader_state.get_match_index(1), 0);
457    }
458
459    #[test]
460    fn test_leader_state_update_success() {
461        let peers = vec![1, 2, 3];
462        let mut leader_state = LeaderState::new(&peers, 10);
463
464        leader_state.update_success(1, 12);
465        assert_eq!(leader_state.get_next_index(1), 13);
466        assert_eq!(leader_state.get_match_index(1), 12);
467    }
468
469    #[test]
470    fn test_leader_state_update_failure() {
471        let peers = vec![1, 2, 3];
472        let mut leader_state = LeaderState::new(&peers, 10);
473
474        leader_state.update_failure(1);
475        assert_eq!(leader_state.get_next_index(1), 10);
476    }
477
478    #[test]
479    fn test_leader_state_calculate_commit_index() {
480        let peers = vec![2, 3, 4, 5];
481        let mut leader_state = LeaderState::new(&peers, 10);
482
483        // With 5 nodes total, quorum is 3
484        leader_state.update_success(2, 8);
485        leader_state.update_success(3, 9);
486        leader_state.update_success(4, 7);
487        leader_state.update_success(5, 6);
488
489        // Sorted match indices: [9, 8, 7, 6]
490        // At position 1 (quorum_size - 2 = 3 - 2 = 1): index 8
491        let commit_idx = leader_state.calculate_commit_index(10, 3);
492        assert_eq!(commit_idx, 8);
493    }
494
495    #[test]
496    fn test_candidate_state_new() {
497        let state = CandidateState::new(1);
498        assert_eq!(state.vote_count(), 1);
499        assert!(state.votes_received.contains(&1));
500    }
501
502    #[test]
503    fn test_candidate_state_record_vote() {
504        let mut state = CandidateState::new(1);
505        state.record_vote(2);
506        state.record_vote(3);
507
508        assert_eq!(state.vote_count(), 3);
509        assert!(state.has_quorum(2));
510    }
511
512    #[test]
513    fn test_candidate_state_has_quorum() {
514        let mut state = CandidateState::new(1);
515        assert!(state.has_quorum(1));
516        assert!(!state.has_quorum(2));
517
518        state.record_vote(2);
519        assert!(state.has_quorum(2));
520    }
521
522    // ── Fast backup / conflict hint tests ─────────────────────────────
523
524    #[test]
525    fn test_update_failure_with_hint_jumps_to_conflict() {
526        let peers = vec![2, 3, 4];
527        let mut ls = LeaderState::new(&peers, 10);
528        // next_index for peer 2 starts at 11
529
530        ls.update_failure_with_hint(2, Some(5), Some(2), 8);
531        assert_eq!(
532            ls.get_next_index(2),
533            5,
534            "should jump back to conflict_index"
535        );
536    }
537
538    #[test]
539    fn test_update_failure_with_hint_no_hint_uses_last_index() {
540        let peers = vec![2, 3];
541        let mut ls = LeaderState::new(&peers, 10);
542
543        ls.update_failure_with_hint(2, None, None, 3);
544        assert_eq!(
545            ls.get_next_index(2),
546            4,
547            "should use follower_last_index + 1"
548        );
549    }
550
551    #[test]
552    fn test_update_failure_with_hint_does_not_go_forward() {
553        let peers = vec![2, 3];
554        let mut ls = LeaderState::new(&peers, 5);
555        // next_index for peer 2 = 6
556
557        // Conflict hint at index 10 -- should not advance next_index
558        ls.update_failure_with_hint(2, Some(10), Some(1), 9);
559        // Should fall back to simple decrement since hint is not helpful
560        assert_eq!(ls.get_next_index(2), 5);
561    }
562
563    // ── Joint consensus commit index tests ────────────────────────────
564
565    #[test]
566    fn test_calculate_commit_index_joint_stable() {
567        use crate::types::{ClusterConfig, ConfigState};
568
569        let peers = vec![2, 3, 4];
570        let mut ls = LeaderState::new(&peers, 10);
571        ls.update_success(2, 8);
572        ls.update_success(3, 7);
573        ls.update_success(4, 6);
574
575        // Stable config: {1, 2, 3, 4, 5} -- but we only track 2, 3, 4
576        // Leader (1) has last_index 10
577        let config = ConfigState::Stable(ClusterConfig::new(
578            vec![
579                (1, "a".into()),
580                (2, "b".into()),
581                (3, "c".into()),
582                (4, "d".into()),
583                (5, "e".into()),
584            ],
585            0,
586        ));
587
588        // Quorum = 3 out of 5
589        // Indices: leader=10, 2=8, 3=7, 4=6, 5=0
590        // Sorted desc: [10, 8, 7, 6, 0]
591        // quorum-1 = 2 => index 7
592        let commit = ls.calculate_commit_index_joint(1, 10, &config);
593        assert_eq!(commit, 7);
594    }
595
596    #[test]
597    fn test_calculate_commit_index_joint_consensus() {
598        use crate::types::{ClusterConfig, ConfigState};
599
600        let peers = vec![2, 3, 4];
601        let mut ls = LeaderState::new(&peers, 10);
602        ls.update_success(2, 8);
603        ls.update_success(3, 7);
604        ls.update_success(4, 9);
605
606        // old: {1, 2, 3} quorum = 2
607        // new: {1, 2, 3, 4} quorum = 3
608        let old = ClusterConfig::new(vec![(1, "a".into()), (2, "b".into()), (3, "c".into())], 0);
609        let new = ClusterConfig::new(
610            vec![
611                (1, "a".into()),
612                (2, "b".into()),
613                (3, "c".into()),
614                (4, "d".into()),
615            ],
616            1,
617        );
618
619        let config = ConfigState::Joint { old, new };
620
621        // old: leader=10, 2=8, 3=7 => sorted [10, 8, 7] => quorum(2)-1=1 => 8
622        // new: leader=10, 2=8, 3=7, 4=9 => sorted [10, 9, 8, 7] => quorum(3)-1=2 => 8
623        // min(8, 8) = 8
624        let commit = ls.calculate_commit_index_joint(1, 10, &config);
625        assert_eq!(commit, 8);
626    }
627
628    #[test]
629    fn test_calculate_commit_index_joint_limited_by_old() {
630        use crate::types::{ClusterConfig, ConfigState};
631
632        let peers = vec![2, 3, 4, 5];
633        let mut ls = LeaderState::new(&peers, 10);
634        ls.update_success(2, 3); // in old config, low match
635        ls.update_success(3, 9); // in both
636        ls.update_success(4, 9); // only in new
637        ls.update_success(5, 9); // only in new
638
639        // old: {1, 2, 3} quorum = 2
640        // new: {1, 3, 4, 5} quorum = 3
641        let old = ClusterConfig::new(vec![(1, "a".into()), (2, "b".into()), (3, "c".into())], 0);
642        let new = ClusterConfig::new(
643            vec![
644                (1, "a".into()),
645                (3, "c".into()),
646                (4, "d".into()),
647                (5, "e".into()),
648            ],
649            1,
650        );
651
652        let config = ConfigState::Joint { old, new };
653
654        // old: leader=10, 2=3, 3=9 => sorted [10, 9, 3] => quorum(2)-1=1 => 9
655        // new: leader=10, 3=9, 4=9, 5=9 => sorted [10, 9, 9, 9] => quorum(3)-1=2 => 9
656        // min(9, 9) = 9
657        let commit = ls.calculate_commit_index_joint(1, 10, &config);
658        assert_eq!(commit, 9);
659    }
660}