1use std::time::Instant;
9
10use crate::error::{RaftError, Result};
11use crate::log::RaftLog;
12use crate::message::{AppendEntriesRequest, LogEntry};
13use crate::state::{HardState, LeaderState, NodeRole, VolatileState};
14use crate::storage::LogStorage;
15
16use super::config::RaftConfig;
17
18#[derive(Debug, Default)]
23pub struct Ready {
24 pub hard_state: Option<HardState>,
26 pub messages: Vec<(u64, AppendEntriesRequest)>,
28 pub vote_requests: Vec<(u64, crate::message::RequestVoteRequest)>,
30 pub committed_entries: Vec<LogEntry>,
32 pub snapshots_needed: Vec<u64>,
35}
36
37impl Ready {
38 pub fn is_empty(&self) -> bool {
39 self.hard_state.is_none()
40 && self.messages.is_empty()
41 && self.vote_requests.is_empty()
42 && self.committed_entries.is_empty()
43 && self.snapshots_needed.is_empty()
44 }
45}
46
47pub struct RaftNode<S: LogStorage> {
53 pub(super) config: RaftConfig,
54 pub(super) role: NodeRole,
55 pub(super) hard_state: HardState,
56 pub(super) volatile: VolatileState,
57 pub(super) leader_state: Option<LeaderState>,
58 pub(super) log: RaftLog<S>,
59 pub(super) election_deadline: Instant,
61 pub(super) heartbeat_deadline: Instant,
63 pub(super) votes_received: Vec<u64>,
65 pub(super) ready: Ready,
67 pub(super) leader_id: u64,
69}
70
71impl<S: LogStorage> RaftNode<S> {
72 pub fn new(config: RaftConfig, storage: S) -> Self {
78 let now = Instant::now();
79 let role = if config.starts_as_learner {
80 NodeRole::Learner
81 } else {
82 NodeRole::Follower
83 };
84 Self {
85 log: RaftLog::new(storage),
86 role,
87 hard_state: HardState::new(),
88 volatile: VolatileState::new(),
89 leader_state: None,
90 election_deadline: now + config.election_timeout_max,
91 heartbeat_deadline: now,
92 votes_received: Vec::new(),
93 ready: Ready::default(),
94 leader_id: 0,
95 config,
96 }
97 }
98
99 pub fn restore(&mut self) -> Result<()> {
101 self.hard_state = self.log.storage().load_hard_state()?;
102 self.log.restore()?;
103 self.reset_election_timeout();
104 Ok(())
105 }
106
107 pub fn node_id(&self) -> u64 {
108 self.config.node_id
109 }
110
111 pub fn group_id(&self) -> u64 {
112 self.config.group_id
113 }
114
115 pub fn role(&self) -> NodeRole {
116 self.role
117 }
118
119 pub fn leader_id(&self) -> u64 {
120 self.leader_id
121 }
122
123 pub fn current_term(&self) -> u64 {
124 self.hard_state.current_term
125 }
126
127 pub fn commit_index(&self) -> u64 {
128 self.volatile.commit_index
129 }
130
131 pub fn last_applied(&self) -> u64 {
132 self.volatile.last_applied
133 }
134
135 pub fn election_deadline_override(&mut self, deadline: Instant) {
137 self.election_deadline = deadline;
138 }
139
140 pub fn take_ready(&mut self) -> Ready {
143 std::mem::take(&mut self.ready)
144 }
145
146 pub fn advance_applied(&mut self, applied_to: u64) {
148 self.volatile.last_applied = applied_to;
149 }
150
151 pub fn match_index_for(&self, peer: u64) -> Option<u64> {
154 self.leader_state
155 .as_ref()
156 .map(|ls| ls.match_index_for(peer))
157 }
158
159 pub fn log_snapshot_index(&self) -> u64 {
160 self.log.snapshot_index()
161 }
162
163 pub fn log_snapshot_term(&self) -> u64 {
164 self.log.snapshot_term()
165 }
166
167 pub fn peers(&self) -> &[u64] {
169 &self.config.peers
170 }
171
172 pub fn voters(&self) -> &[u64] {
175 &self.config.peers
176 }
177
178 pub fn learners(&self) -> &[u64] {
180 &self.config.learners
181 }
182
183 pub fn is_learner_peer(&self, peer: u64) -> bool {
185 self.config.learners.contains(&peer)
186 }
187
188 pub fn tick(&mut self) {
190 let now = Instant::now();
191
192 match self.role {
193 NodeRole::Follower | NodeRole::Candidate => {
194 if now >= self.election_deadline {
195 self.start_election();
196 }
197 }
198 NodeRole::Leader => {
199 if now >= self.heartbeat_deadline {
200 self.replicate_to_all();
201 self.heartbeat_deadline = now + self.config.heartbeat_interval;
202 }
203 }
204 NodeRole::Learner => {
205 }
208 }
209 }
210
211 pub fn propose(&mut self, data: Vec<u8>) -> Result<u64> {
213 if self.role != NodeRole::Leader {
214 return Err(RaftError::NotLeader {
215 leader_hint: if self.leader_id != 0 {
216 Some(self.leader_id)
217 } else {
218 None
219 },
220 });
221 }
222
223 let index = self.log.last_index() + 1;
224 let entry = LogEntry {
225 term: self.hard_state.current_term,
226 index,
227 data,
228 };
229
230 self.log.append(entry)?;
231 self.replicate_to_all();
232
233 if self.config.cluster_size() == 1 {
235 self.volatile.commit_index = index;
236 self.collect_committed_entries();
237 }
238
239 Ok(index)
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::storage::MemStorage;
247 use std::time::Duration;
248
249 fn test_config(node_id: u64, peers: Vec<u64>) -> RaftConfig {
250 RaftConfig {
251 node_id,
252 group_id: 1,
253 peers,
254 learners: vec![],
255 starts_as_learner: false,
256 election_timeout_min: Duration::from_millis(150),
257 election_timeout_max: Duration::from_millis(300),
258 heartbeat_interval: Duration::from_millis(50),
259 }
260 }
261
262 #[test]
263 fn single_node_election() {
264 let config = test_config(1, vec![]);
265 let mut node = RaftNode::new(config, MemStorage::new());
266
267 node.election_deadline = Instant::now() - Duration::from_millis(1);
268 node.tick();
269
270 assert_eq!(node.role(), NodeRole::Leader);
271 assert_eq!(node.current_term(), 1);
272 assert_eq!(node.leader_id(), 1);
273 }
274
275 #[test]
276 fn single_node_propose_and_commit() {
277 let config = test_config(1, vec![]);
278 let mut node = RaftNode::new(config, MemStorage::new());
279 node.election_deadline = Instant::now() - Duration::from_millis(1);
280 node.tick();
281 assert_eq!(node.role(), NodeRole::Leader);
282
283 let ready = node.take_ready();
284 assert!(!ready.committed_entries.is_empty());
285 node.advance_applied(ready.committed_entries.last().unwrap().index);
286
287 let idx = node.propose(b"hello".to_vec()).unwrap();
288 assert_eq!(idx, 2);
289
290 let ready = node.take_ready();
291 assert_eq!(ready.committed_entries.len(), 1);
292 assert_eq!(ready.committed_entries[0].data, b"hello");
293 }
294
295 #[test]
296 fn propose_as_follower_fails() {
297 let config = test_config(1, vec![2, 3]);
298 let node = &mut RaftNode::new(config, MemStorage::new());
299 let err = node.propose(b"data".to_vec()).unwrap_err();
300 assert!(matches!(err, RaftError::NotLeader { .. }));
301 }
302
303 #[test]
304 fn snapshot_needed_after_compaction() {
305 let config = test_config(1, vec![2, 3]);
306 let mut node = RaftNode::new(config, MemStorage::new());
307
308 node.election_deadline = Instant::now() - Duration::from_millis(1);
309 node.tick();
310 let _ready = node.take_ready();
311 let resp = crate::message::RequestVoteResponse {
312 term: 1,
313 vote_granted: true,
314 };
315 node.handle_request_vote_response(2, &resp);
316 assert_eq!(node.role(), NodeRole::Leader);
317 let _ = node.take_ready();
318
319 for i in 0..9 {
320 node.propose(vec![i]).unwrap();
321 }
322 let _ = node.take_ready();
323
324 node.log.apply_snapshot(8, 1);
325
326 node.replicate_to_all();
327 let ready = node.take_ready();
328
329 assert!(
330 !ready.snapshots_needed.is_empty(),
331 "expected snapshots_needed to be non-empty"
332 );
333 }
334
335 #[test]
336 fn starts_as_learner_role() {
337 let mut cfg = test_config(2, vec![1]);
338 cfg.starts_as_learner = true;
339 let node = RaftNode::new(cfg, MemStorage::new());
340 assert_eq!(node.role(), NodeRole::Learner);
341 }
342
343 #[test]
344 fn learner_tick_does_not_start_election() {
345 let mut cfg = test_config(2, vec![1]);
346 cfg.starts_as_learner = true;
347 let mut node = RaftNode::new(cfg, MemStorage::new());
348 node.election_deadline = Instant::now() - Duration::from_millis(1);
351 node.tick();
352 assert_eq!(node.role(), NodeRole::Learner);
353 assert_eq!(node.current_term(), 0);
354 }
355}