Skip to main content

nodedb_raft/node/
core.rs

1//! `RaftNode` struct, constructors, simple accessors, `tick`, and `propose`.
2//!
3//! Membership mutation (add/remove voter, add/remove/promote learner) lives
4//! in [`super::membership`]. State transitions (election, `become_leader`,
5//! replication) live in [`super::internal`]. RPC handlers live in
6//! [`super::rpc`].
7
8use std::time::Instant;
9
10use crate::error::{RaftError, Result};
11use crate::log::RaftLog;
12use crate::message::{AppendEntriesRequest, LogEntry};
13use crate::state::{HardState, LeaderState, NodeRole, VolatileState};
14use crate::storage::LogStorage;
15
16use super::config::RaftConfig;
17
18/// Output actions produced by a tick or RPC handler.
19///
20/// The caller (Multi-Raft coordinator) is responsible for executing these
21/// via the transport and applying committed entries to the state machine.
22#[derive(Debug, Default)]
23pub struct Ready {
24    /// Hard state to persist (if changed).
25    pub hard_state: Option<HardState>,
26    /// Entries to send to specific peers (peer_id, request).
27    pub messages: Vec<(u64, AppendEntriesRequest)>,
28    /// Vote requests to send (peer_id, request).
29    pub vote_requests: Vec<(u64, crate::message::RequestVoteRequest)>,
30    /// Newly committed entries to apply to the state machine.
31    pub committed_entries: Vec<LogEntry>,
32    /// Peers that need an InstallSnapshot RPC because their next_index
33    /// falls behind the leader's snapshot_index (log compacted).
34    pub snapshots_needed: Vec<u64>,
35}
36
37impl Ready {
38    pub fn is_empty(&self) -> bool {
39        self.hard_state.is_none()
40            && self.messages.is_empty()
41            && self.vote_requests.is_empty()
42            && self.committed_entries.is_empty()
43            && self.snapshots_needed.is_empty()
44    }
45}
46
47/// A single Raft group's state machine.
48///
49/// This is a deterministic, event-driven core. It does NOT own any threads
50/// or timers — the caller drives it via `tick()` and RPC handler methods,
51/// and reads output via `take_ready()`.
52pub struct RaftNode<S: LogStorage> {
53    pub(super) config: RaftConfig,
54    pub(super) role: NodeRole,
55    pub(super) hard_state: HardState,
56    pub(super) volatile: VolatileState,
57    pub(super) leader_state: Option<LeaderState>,
58    pub(super) log: RaftLog<S>,
59    /// When the next election timeout fires.
60    pub(super) election_deadline: Instant,
61    /// When the next heartbeat should be sent (leader only).
62    pub(super) heartbeat_deadline: Instant,
63    /// Votes received in current election.
64    pub(super) votes_received: Vec<u64>,
65    /// Pending ready output.
66    pub(super) ready: Ready,
67    /// Known leader ID (0 = unknown).
68    pub(super) leader_id: u64,
69}
70
71impl<S: LogStorage> RaftNode<S> {
72    /// Create a new Raft node. Call `restore()` before ticking.
73    ///
74    /// If `config.starts_as_learner` is `true`, the node boots in the
75    /// `Learner` role and will never run an election timeout or become a
76    /// leader until it is promoted via `promote_self_to_voter`.
77    pub fn new(config: RaftConfig, storage: S) -> Self {
78        let now = Instant::now();
79        let role = if config.starts_as_learner {
80            NodeRole::Learner
81        } else {
82            NodeRole::Follower
83        };
84        Self {
85            log: RaftLog::new(storage),
86            role,
87            hard_state: HardState::new(),
88            volatile: VolatileState::new(),
89            leader_state: None,
90            election_deadline: now + config.election_timeout_max,
91            heartbeat_deadline: now,
92            votes_received: Vec::new(),
93            ready: Ready::default(),
94            leader_id: 0,
95            config,
96        }
97    }
98
99    /// Restore state from persistent storage. Must be called before ticking.
100    pub fn restore(&mut self) -> Result<()> {
101        self.hard_state = self.log.storage().load_hard_state()?;
102        self.log.restore()?;
103        self.reset_election_timeout();
104        Ok(())
105    }
106
107    pub fn node_id(&self) -> u64 {
108        self.config.node_id
109    }
110
111    pub fn group_id(&self) -> u64 {
112        self.config.group_id
113    }
114
115    pub fn role(&self) -> NodeRole {
116        self.role
117    }
118
119    pub fn leader_id(&self) -> u64 {
120        self.leader_id
121    }
122
123    pub fn current_term(&self) -> u64 {
124        self.hard_state.current_term
125    }
126
127    pub fn commit_index(&self) -> u64 {
128        self.volatile.commit_index
129    }
130
131    pub fn last_applied(&self) -> u64 {
132        self.volatile.last_applied
133    }
134
135    /// Override election deadline (for testing).
136    pub fn election_deadline_override(&mut self, deadline: Instant) {
137        self.election_deadline = deadline;
138    }
139
140    /// Take the pending `Ready` output. Caller must execute messages,
141    /// persist hard state, and apply committed entries.
142    pub fn take_ready(&mut self) -> Ready {
143        std::mem::take(&mut self.ready)
144    }
145
146    /// Advance `last_applied` after the caller has applied entries.
147    pub fn advance_applied(&mut self, applied_to: u64) {
148        self.volatile.last_applied = applied_to;
149    }
150
151    /// Query a peer's match_index from the leader's replication state.
152    /// Returns `None` if this node is not the leader or the peer is unknown.
153    pub fn match_index_for(&self, peer: u64) -> Option<u64> {
154        self.leader_state
155            .as_ref()
156            .map(|ls| ls.match_index_for(peer))
157    }
158
159    pub fn log_snapshot_index(&self) -> u64 {
160        self.log.snapshot_index()
161    }
162
163    pub fn log_snapshot_term(&self) -> u64 {
164        self.log.snapshot_term()
165    }
166
167    /// Current voter peer list (excluding self).
168    pub fn peers(&self) -> &[u64] {
169        &self.config.peers
170    }
171
172    /// Current voter peer list — alias for `peers()`, clearer at call sites
173    /// that need to distinguish voters from learners.
174    pub fn voters(&self) -> &[u64] {
175        &self.config.peers
176    }
177
178    /// Current learner peer list (excluding self).
179    pub fn learners(&self) -> &[u64] {
180        &self.config.learners
181    }
182
183    /// Whether `peer` is currently tracked as a learner in this group.
184    pub fn is_learner_peer(&self, peer: u64) -> bool {
185        self.config.learners.contains(&peer)
186    }
187
188    /// Drive time-based events: election timeout, heartbeat.
189    pub fn tick(&mut self) {
190        let now = Instant::now();
191
192        match self.role {
193            NodeRole::Follower | NodeRole::Candidate => {
194                if now >= self.election_deadline {
195                    self.start_election();
196                }
197            }
198            NodeRole::Leader => {
199                if now >= self.heartbeat_deadline {
200                    self.replicate_to_all();
201                    self.heartbeat_deadline = now + self.config.heartbeat_interval;
202                }
203            }
204            NodeRole::Learner => {
205                // Learners never run election timeouts. They catch up
206                // passively via AppendEntries from the leader.
207            }
208        }
209    }
210
211    /// Propose a new entry (leader only). Returns the log index.
212    pub fn propose(&mut self, data: Vec<u8>) -> Result<u64> {
213        if self.role != NodeRole::Leader {
214            return Err(RaftError::NotLeader {
215                leader_hint: if self.leader_id != 0 {
216                    Some(self.leader_id)
217                } else {
218                    None
219                },
220            });
221        }
222
223        let index = self.log.last_index() + 1;
224        let entry = LogEntry {
225            term: self.hard_state.current_term,
226            index,
227            data,
228        };
229
230        self.log.append(entry)?;
231        self.replicate_to_all();
232
233        // Single-voter cluster: commit immediately. Learners do not count.
234        if self.config.cluster_size() == 1 {
235            self.volatile.commit_index = index;
236            self.collect_committed_entries();
237        }
238
239        Ok(index)
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::storage::MemStorage;
247    use std::time::Duration;
248
249    fn test_config(node_id: u64, peers: Vec<u64>) -> RaftConfig {
250        RaftConfig {
251            node_id,
252            group_id: 1,
253            peers,
254            learners: vec![],
255            starts_as_learner: false,
256            election_timeout_min: Duration::from_millis(150),
257            election_timeout_max: Duration::from_millis(300),
258            heartbeat_interval: Duration::from_millis(50),
259        }
260    }
261
262    #[test]
263    fn single_node_election() {
264        let config = test_config(1, vec![]);
265        let mut node = RaftNode::new(config, MemStorage::new());
266
267        node.election_deadline = Instant::now() - Duration::from_millis(1);
268        node.tick();
269
270        assert_eq!(node.role(), NodeRole::Leader);
271        assert_eq!(node.current_term(), 1);
272        assert_eq!(node.leader_id(), 1);
273    }
274
275    #[test]
276    fn single_node_propose_and_commit() {
277        let config = test_config(1, vec![]);
278        let mut node = RaftNode::new(config, MemStorage::new());
279        node.election_deadline = Instant::now() - Duration::from_millis(1);
280        node.tick();
281        assert_eq!(node.role(), NodeRole::Leader);
282
283        let ready = node.take_ready();
284        assert!(!ready.committed_entries.is_empty());
285        node.advance_applied(ready.committed_entries.last().unwrap().index);
286
287        let idx = node.propose(b"hello".to_vec()).unwrap();
288        assert_eq!(idx, 2);
289
290        let ready = node.take_ready();
291        assert_eq!(ready.committed_entries.len(), 1);
292        assert_eq!(ready.committed_entries[0].data, b"hello");
293    }
294
295    #[test]
296    fn propose_as_follower_fails() {
297        let config = test_config(1, vec![2, 3]);
298        let node = &mut RaftNode::new(config, MemStorage::new());
299        let err = node.propose(b"data".to_vec()).unwrap_err();
300        assert!(matches!(err, RaftError::NotLeader { .. }));
301    }
302
303    #[test]
304    fn snapshot_needed_after_compaction() {
305        let config = test_config(1, vec![2, 3]);
306        let mut node = RaftNode::new(config, MemStorage::new());
307
308        node.election_deadline = Instant::now() - Duration::from_millis(1);
309        node.tick();
310        let _ready = node.take_ready();
311        let resp = crate::message::RequestVoteResponse {
312            term: 1,
313            vote_granted: true,
314        };
315        node.handle_request_vote_response(2, &resp);
316        assert_eq!(node.role(), NodeRole::Leader);
317        let _ = node.take_ready();
318
319        for i in 0..9 {
320            node.propose(vec![i]).unwrap();
321        }
322        let _ = node.take_ready();
323
324        node.log.apply_snapshot(8, 1);
325
326        node.replicate_to_all();
327        let ready = node.take_ready();
328
329        assert!(
330            !ready.snapshots_needed.is_empty(),
331            "expected snapshots_needed to be non-empty"
332        );
333    }
334
335    #[test]
336    fn starts_as_learner_role() {
337        let mut cfg = test_config(2, vec![1]);
338        cfg.starts_as_learner = true;
339        let node = RaftNode::new(cfg, MemStorage::new());
340        assert_eq!(node.role(), NodeRole::Learner);
341    }
342
343    #[test]
344    fn learner_tick_does_not_start_election() {
345        let mut cfg = test_config(2, vec![1]);
346        cfg.starts_as_learner = true;
347        let mut node = RaftNode::new(cfg, MemStorage::new());
348        // Force "election deadline" in the past: a follower would immediately
349        // start an election, but a learner must ignore it.
350        node.election_deadline = Instant::now() - Duration::from_millis(1);
351        node.tick();
352        assert_eq!(node.role(), NodeRole::Learner);
353        assert_eq!(node.current_term(), 0);
354    }
355}