Skip to main content

amaters_cluster/
rpc.rs

1//! RPC message types for Raft consensus
2
3use crate::log::LogEntry;
4use crate::types::{FencingToken, LogIndex, NodeId, Term};
5
6/// Request for a vote from a candidate
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct RequestVoteRequest {
9    /// Candidate's term
10    pub term: Term,
11    /// Candidate requesting vote
12    pub candidate_id: NodeId,
13    /// Index of candidate's last log entry
14    pub last_log_index: LogIndex,
15    /// Term of candidate's last log entry
16    pub last_log_term: Term,
17}
18
19impl RequestVoteRequest {
20    /// Create a new vote request
21    pub fn new(
22        term: Term,
23        candidate_id: NodeId,
24        last_log_index: LogIndex,
25        last_log_term: Term,
26    ) -> Self {
27        Self {
28            term,
29            candidate_id,
30            last_log_index,
31            last_log_term,
32        }
33    }
34}
35
36/// Response to a vote request
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct RequestVoteResponse {
39    /// Current term, for candidate to update itself
40    pub term: Term,
41    /// True if candidate received vote
42    pub vote_granted: bool,
43    /// Hint about who the current leader is (for client redirection)
44    pub leader_hint: Option<NodeId>,
45}
46
47impl RequestVoteResponse {
48    /// Create a new vote response
49    pub fn new(term: Term, vote_granted: bool) -> Self {
50        Self {
51            term,
52            vote_granted,
53            leader_hint: None,
54        }
55    }
56
57    /// Create a new vote response with a leader hint
58    pub fn with_leader_hint(term: Term, vote_granted: bool, leader_hint: Option<NodeId>) -> Self {
59        Self {
60            term,
61            vote_granted,
62            leader_hint,
63        }
64    }
65
66    /// Create a granted response
67    pub fn granted(term: Term) -> Self {
68        Self::new(term, true)
69    }
70
71    /// Create a rejected response
72    pub fn rejected(term: Term) -> Self {
73        Self::new(term, false)
74    }
75}
76
77/// Request to append entries (heartbeat or log replication)
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct AppendEntriesRequest {
80    /// Leader's term
81    pub term: Term,
82    /// Leader's ID (for redirecting clients)
83    pub leader_id: NodeId,
84    /// Index of log entry immediately preceding new ones
85    pub prev_log_index: LogIndex,
86    /// Term of prev_log_index entry
87    pub prev_log_term: Term,
88    /// Log entries to store (empty for heartbeat)
89    pub entries: Vec<LogEntry>,
90    /// Leader's commit index
91    pub leader_commit: LogIndex,
92    /// Optional fencing token issued by the leader (None for legacy / non-leaders)
93    pub fencing_token: Option<FencingToken>,
94}
95
96impl AppendEntriesRequest {
97    /// Create a new append entries request
98    pub fn new(
99        term: Term,
100        leader_id: NodeId,
101        prev_log_index: LogIndex,
102        prev_log_term: Term,
103        entries: Vec<LogEntry>,
104        leader_commit: LogIndex,
105    ) -> Self {
106        Self {
107            term,
108            leader_id,
109            prev_log_index,
110            prev_log_term,
111            entries,
112            leader_commit,
113            fencing_token: None,
114        }
115    }
116
117    /// Create an append entries request with a fencing token attached
118    pub fn with_fencing_token(
119        term: Term,
120        leader_id: NodeId,
121        prev_log_index: LogIndex,
122        prev_log_term: Term,
123        entries: Vec<LogEntry>,
124        leader_commit: LogIndex,
125        token: FencingToken,
126    ) -> Self {
127        Self {
128            term,
129            leader_id,
130            prev_log_index,
131            prev_log_term,
132            entries,
133            leader_commit,
134            fencing_token: Some(token),
135        }
136    }
137
138    /// Create a heartbeat (empty entries)
139    pub fn heartbeat(
140        term: Term,
141        leader_id: NodeId,
142        prev_log_index: LogIndex,
143        prev_log_term: Term,
144        leader_commit: LogIndex,
145    ) -> Self {
146        Self::new(
147            term,
148            leader_id,
149            prev_log_index,
150            prev_log_term,
151            Vec::new(),
152            leader_commit,
153        )
154    }
155
156    /// Check if this is a heartbeat (no entries)
157    pub fn is_heartbeat(&self) -> bool {
158        self.entries.is_empty()
159    }
160}
161
162/// Response to an append entries request
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct AppendEntriesResponse {
165    /// Current term, for leader to update itself
166    pub term: Term,
167    /// True if follower contained entry matching prev_log_index and prev_log_term
168    pub success: bool,
169    /// Follower's last log index (for optimization)
170    pub last_log_index: LogIndex,
171    /// Conflict index (for fast log backtracking)
172    pub conflict_index: Option<LogIndex>,
173    /// Conflict term (for fast log backtracking)
174    pub conflict_term: Option<Term>,
175    /// Hint about who the current leader is (for client redirection)
176    pub leader_hint: Option<NodeId>,
177    /// Optional fencing token echoed back from the follower
178    pub fencing_token: Option<FencingToken>,
179}
180
181impl AppendEntriesResponse {
182    /// Create a new append entries response
183    pub fn new(
184        term: Term,
185        success: bool,
186        last_log_index: LogIndex,
187        conflict_index: Option<LogIndex>,
188        conflict_term: Option<Term>,
189    ) -> Self {
190        Self {
191            term,
192            success,
193            last_log_index,
194            conflict_index,
195            conflict_term,
196            leader_hint: None,
197            fencing_token: None,
198        }
199    }
200
201    /// Create a success response with a fencing token echoed
202    pub fn success_with_token(term: Term, last_log_index: LogIndex, token: FencingToken) -> Self {
203        Self {
204            term,
205            success: true,
206            last_log_index,
207            conflict_index: None,
208            conflict_term: None,
209            leader_hint: None,
210            fencing_token: Some(token),
211        }
212    }
213
214    /// Create a success response
215    pub fn success(term: Term, last_log_index: LogIndex) -> Self {
216        Self::new(term, true, last_log_index, None, None)
217    }
218
219    /// Create a failure response
220    pub fn failure(
221        term: Term,
222        last_log_index: LogIndex,
223        conflict_index: LogIndex,
224        conflict_term: Term,
225    ) -> Self {
226        Self::new(
227            term,
228            false,
229            last_log_index,
230            Some(conflict_index),
231            Some(conflict_term),
232        )
233    }
234
235    /// Create a rejected response (generic failure)
236    pub fn rejected(term: Term) -> Self {
237        Self::new(term, false, 0, None, None)
238    }
239
240    /// Attach a leader hint to this response.
241    pub fn with_leader_hint(mut self, leader_hint: Option<NodeId>) -> Self {
242        self.leader_hint = leader_hint;
243        self
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use crate::log::Command;
251
252    #[test]
253    fn test_request_vote_request() {
254        let req = RequestVoteRequest::new(5, 1, 10, 3);
255        assert_eq!(req.term, 5);
256        assert_eq!(req.candidate_id, 1);
257        assert_eq!(req.last_log_index, 10);
258        assert_eq!(req.last_log_term, 3);
259    }
260
261    #[test]
262    fn test_request_vote_response() {
263        let resp = RequestVoteResponse::granted(5);
264        assert_eq!(resp.term, 5);
265        assert!(resp.vote_granted);
266
267        let resp = RequestVoteResponse::rejected(6);
268        assert_eq!(resp.term, 6);
269        assert!(!resp.vote_granted);
270    }
271
272    #[test]
273    fn test_append_entries_request_heartbeat() {
274        let req = AppendEntriesRequest::heartbeat(5, 1, 10, 3, 8);
275        assert_eq!(req.term, 5);
276        assert_eq!(req.leader_id, 1);
277        assert_eq!(req.prev_log_index, 10);
278        assert_eq!(req.prev_log_term, 3);
279        assert!(req.entries.is_empty());
280        assert_eq!(req.leader_commit, 8);
281        assert!(req.is_heartbeat());
282    }
283
284    #[test]
285    fn test_append_entries_request_with_entries() {
286        let entry = LogEntry::new(5, 11, Command::from_str("test"));
287        let req = AppendEntriesRequest::new(5, 1, 10, 3, vec![entry], 8);
288        assert!(!req.is_heartbeat());
289        assert_eq!(req.entries.len(), 1);
290    }
291
292    #[test]
293    fn test_append_entries_response_success() {
294        let resp = AppendEntriesResponse::success(5, 11);
295        assert_eq!(resp.term, 5);
296        assert!(resp.success);
297        assert_eq!(resp.last_log_index, 11);
298        assert!(resp.conflict_index.is_none());
299        assert!(resp.conflict_term.is_none());
300    }
301
302    #[test]
303    fn test_append_entries_response_failure() {
304        let resp = AppendEntriesResponse::failure(5, 9, 8, 2);
305        assert_eq!(resp.term, 5);
306        assert!(!resp.success);
307        assert_eq!(resp.last_log_index, 9);
308        assert_eq!(resp.conflict_index, Some(8));
309        assert_eq!(resp.conflict_term, Some(2));
310    }
311
312    #[test]
313    fn test_request_vote_response_with_leader_hint() {
314        let resp = RequestVoteResponse::with_leader_hint(5, false, Some(3));
315        assert_eq!(resp.term, 5);
316        assert!(!resp.vote_granted);
317        assert_eq!(resp.leader_hint, Some(3));
318    }
319
320    #[test]
321    fn test_append_entries_response_with_leader_hint() {
322        let resp = AppendEntriesResponse::success(5, 11).with_leader_hint(Some(2));
323        assert_eq!(resp.leader_hint, Some(2));
324        assert!(resp.success);
325    }
326
327    #[test]
328    fn test_leader_hint_none_by_default() {
329        let resp = AppendEntriesResponse::success(5, 11);
330        assert_eq!(resp.leader_hint, None);
331
332        let vote_resp = RequestVoteResponse::granted(5);
333        assert_eq!(vote_resp.leader_hint, None);
334    }
335}