Skip to main content

nodedb_raft/
state.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3/// Persistent state that must survive restarts.
4///
5/// Corresponds to Raft paper Figure 2 "Persistent state on all servers".
6#[derive(
7    Debug,
8    Clone,
9    PartialEq,
10    Eq,
11    serde::Serialize,
12    serde::Deserialize,
13    zerompk::ToMessagePack,
14    zerompk::FromMessagePack,
15)]
16pub struct HardState {
17    /// Latest term this server has seen.
18    pub current_term: u64,
19    /// Candidate that received vote in current term (0 = none).
20    pub voted_for: u64,
21}
22
23impl HardState {
24    pub fn new() -> Self {
25        Self {
26            current_term: 0,
27            voted_for: 0,
28        }
29    }
30}
31
32impl Default for HardState {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38/// Role of a Raft node within a group.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum NodeRole {
41    Follower,
42    Candidate,
43    Leader,
44    /// Non-voting member catching up: a new node joins as learner first.
45    Learner,
46    /// Cross-cluster observer: receives log entries from a source cluster as a
47    /// non-voting, non-quorum member. Unlike a `Learner`, an observer never
48    /// transitions to `Voter` within this group — it permanently observes.
49    /// Acks are advisory and never gate commit on the source.
50    Observer,
51}
52
53/// Role of a remote peer as seen by the local leader.
54///
55/// Used to classify tracked peers so the leader can send entries to both
56/// voter peers and observer peers without including observers in quorum math.
57///
58/// Exhaustive matches are required everywhere this enum is matched — no
59/// `_ =>` arms.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum PeerRole {
62    /// Full voting member that participates in leader election and the commit
63    /// quorum.
64    Voter,
65    /// Cross-cluster observer that receives entries and acks advisorily.
66    /// Never counted in quorum; slow-apply does not stall source commits.
67    Observer,
68}
69
70/// Volatile state on all servers.
71#[derive(Debug, Clone)]
72pub struct VolatileState {
73    /// Index of highest log entry known to be committed.
74    pub commit_index: u64,
75    /// Index of highest log entry applied to state machine.
76    pub last_applied: u64,
77}
78
79impl VolatileState {
80    pub fn new() -> Self {
81        Self {
82            commit_index: 0,
83            last_applied: 0,
84        }
85    }
86}
87
88impl Default for VolatileState {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94/// Per-observer send state tracked by the leader.
95///
96/// Observers have an independent bounded send queue. When the queue is full
97/// because the observer is slow, new entries are dropped from the queue and
98/// the observer falls into snapshot-recovery mode on reconnect. Source commits
99/// are never delayed by observer apply pace.
100#[derive(Debug, Clone)]
101pub struct ObserverState {
102    /// Index of the next log entry to send to this observer.
103    pub next_index: u64,
104    /// Index of the highest log entry the observer has acked (advisory).
105    pub match_index: u64,
106    /// Number of entries currently queued for this observer (advisory
107    /// backpressure tracking). Does not gate commit.
108    pub pending_count: u32,
109}
110
111impl ObserverState {
112    /// Maximum number of in-flight entries queued for an observer before the
113    /// leader stops pushing and waits for the observer to drain. Once the
114    /// observer drains below this threshold, replication resumes. Source
115    /// commits are never affected.
116    pub const MAX_PENDING: u32 = 256;
117}
118
119/// Volatile state on leaders (reinitialized after election).
120#[derive(Debug, Clone)]
121pub struct LeaderState {
122    /// For each voter peer: index of next log entry to send.
123    pub next_index: Vec<(u64, u64)>,
124    /// For each voter/learner peer: index of highest log entry known to be replicated.
125    pub match_index: Vec<(u64, u64)>,
126    /// Per-observer send state. Observers are tracked separately from voters
127    /// and learners so quorum math never accidentally includes them.
128    pub observer_states: Vec<(u64, ObserverState)>,
129}
130
131impl LeaderState {
132    /// Create leader state for the given voter/learner peers plus observers.
133    pub fn new(peers: &[u64], observers: &[u64], last_log_index: u64) -> Self {
134        Self {
135            next_index: peers.iter().map(|&id| (id, last_log_index + 1)).collect(),
136            match_index: peers.iter().map(|&id| (id, 0)).collect(),
137            observer_states: observers
138                .iter()
139                .map(|&id| {
140                    (
141                        id,
142                        ObserverState {
143                            next_index: last_log_index + 1,
144                            match_index: 0,
145                            pending_count: 0,
146                        },
147                    )
148                })
149                .collect(),
150        }
151    }
152
153    pub fn next_index_for(&self, peer: u64) -> u64 {
154        self.next_index
155            .iter()
156            .find(|&&(id, _)| id == peer)
157            .map(|&(_, idx)| idx)
158            .unwrap_or(1)
159    }
160
161    pub fn set_next_index(&mut self, peer: u64, index: u64) {
162        if let Some(entry) = self.next_index.iter_mut().find(|e| e.0 == peer) {
163            entry.1 = index;
164        }
165    }
166
167    pub fn match_index_for(&self, peer: u64) -> u64 {
168        self.match_index
169            .iter()
170            .find(|&&(id, _)| id == peer)
171            .map(|&(_, idx)| idx)
172            .unwrap_or(0)
173    }
174
175    pub fn set_match_index(&mut self, peer: u64, index: u64) {
176        if let Some(entry) = self.match_index.iter_mut().find(|e| e.0 == peer) {
177            entry.1 = index;
178        }
179    }
180
181    /// Add a new voter/learner peer to leader tracking.
182    pub fn add_peer(&mut self, peer: u64, last_log_index: u64) {
183        if !self.next_index.iter().any(|&(id, _)| id == peer) {
184            self.next_index.push((peer, last_log_index + 1));
185            self.match_index.push((peer, 0));
186        }
187    }
188
189    /// Remove a voter/learner peer from leader tracking.
190    pub fn remove_peer(&mut self, peer: u64) {
191        self.next_index.retain(|&(id, _)| id != peer);
192        self.match_index.retain(|&(id, _)| id != peer);
193    }
194
195    /// Current voter/learner peer list tracked by this leader state.
196    pub fn peers(&self) -> Vec<u64> {
197        self.next_index.iter().map(|&(id, _)| id).collect()
198    }
199
200    /// Add an observer to leader tracking.
201    pub fn add_observer(&mut self, observer: u64, last_log_index: u64) {
202        if !self.observer_states.iter().any(|&(id, _)| id == observer) {
203            self.observer_states.push((
204                observer,
205                ObserverState {
206                    next_index: last_log_index + 1,
207                    match_index: 0,
208                    pending_count: 0,
209                },
210            ));
211        }
212    }
213
214    /// Remove an observer from leader tracking.
215    pub fn remove_observer(&mut self, observer: u64) {
216        self.observer_states.retain(|&(id, _)| id != observer);
217    }
218
219    /// Get a mutable reference to an observer's state.
220    pub fn observer_state_mut(&mut self, observer: u64) -> Option<&mut ObserverState> {
221        self.observer_states
222            .iter_mut()
223            .find(|(id, _)| *id == observer)
224            .map(|(_, state)| state)
225    }
226
227    /// Whether an observer's send queue is below the backpressure threshold.
228    /// Returns `false` if the observer is unknown.
229    pub fn observer_can_receive(&self, observer: u64) -> bool {
230        self.observer_states
231            .iter()
232            .find(|(id, _)| *id == observer)
233            .map(|(_, state)| state.pending_count < ObserverState::MAX_PENDING)
234            .unwrap_or(false)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn hard_state_default() {
244        let hs = HardState::new();
245        assert_eq!(hs.current_term, 0);
246        assert_eq!(hs.voted_for, 0);
247    }
248
249    #[test]
250    fn leader_state_initialization() {
251        let peers = vec![2, 3, 4];
252        let ls = LeaderState::new(&peers, &[], 10);
253        assert_eq!(ls.next_index_for(2), 11);
254        assert_eq!(ls.next_index_for(3), 11);
255        assert_eq!(ls.match_index_for(2), 0);
256    }
257
258    #[test]
259    fn leader_state_update() {
260        let peers = vec![2, 3];
261        let mut ls = LeaderState::new(&peers, &[], 5);
262        ls.set_next_index(2, 8);
263        ls.set_match_index(2, 7);
264        assert_eq!(ls.next_index_for(2), 8);
265        assert_eq!(ls.match_index_for(2), 7);
266        // Peer 3 unchanged.
267        assert_eq!(ls.next_index_for(3), 6);
268    }
269
270    #[test]
271    fn node_role_equality() {
272        assert_eq!(NodeRole::Follower, NodeRole::Follower);
273        assert_ne!(NodeRole::Follower, NodeRole::Leader);
274    }
275}