amaters_cluster/
rpc.rs

1//! RPC message types for Raft consensus
2
3use crate::log::LogEntry;
4use crate::types::{LogIndex, NodeId, Term};
5
6/// Request for a vote from a candidate
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct RequestVoteRequest {
9    /// Candidate's term
10    pub term: Term,
11    /// Candidate requesting vote
12    pub candidate_id: NodeId,
13    /// Index of candidate's last log entry
14    pub last_log_index: LogIndex,
15    /// Term of candidate's last log entry
16    pub last_log_term: Term,
17}
18
19impl RequestVoteRequest {
20    /// Create a new vote request
21    pub fn new(
22        term: Term,
23        candidate_id: NodeId,
24        last_log_index: LogIndex,
25        last_log_term: Term,
26    ) -> Self {
27        Self {
28            term,
29            candidate_id,
30            last_log_index,
31            last_log_term,
32        }
33    }
34}
35
36/// Response to a vote request
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct RequestVoteResponse {
39    /// Current term, for candidate to update itself
40    pub term: Term,
41    /// True if candidate received vote
42    pub vote_granted: bool,
43}
44
45impl RequestVoteResponse {
46    /// Create a new vote response
47    pub fn new(term: Term, vote_granted: bool) -> Self {
48        Self { term, vote_granted }
49    }
50
51    /// Create a granted response
52    pub fn granted(term: Term) -> Self {
53        Self::new(term, true)
54    }
55
56    /// Create a rejected response
57    pub fn rejected(term: Term) -> Self {
58        Self::new(term, false)
59    }
60}
61
62/// Request to append entries (heartbeat or log replication)
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct AppendEntriesRequest {
65    /// Leader's term
66    pub term: Term,
67    /// Leader's ID (for redirecting clients)
68    pub leader_id: NodeId,
69    /// Index of log entry immediately preceding new ones
70    pub prev_log_index: LogIndex,
71    /// Term of prev_log_index entry
72    pub prev_log_term: Term,
73    /// Log entries to store (empty for heartbeat)
74    pub entries: Vec<LogEntry>,
75    /// Leader's commit index
76    pub leader_commit: LogIndex,
77}
78
79impl AppendEntriesRequest {
80    /// Create a new append entries request
81    pub fn new(
82        term: Term,
83        leader_id: NodeId,
84        prev_log_index: LogIndex,
85        prev_log_term: Term,
86        entries: Vec<LogEntry>,
87        leader_commit: LogIndex,
88    ) -> Self {
89        Self {
90            term,
91            leader_id,
92            prev_log_index,
93            prev_log_term,
94            entries,
95            leader_commit,
96        }
97    }
98
99    /// Create a heartbeat (empty entries)
100    pub fn heartbeat(
101        term: Term,
102        leader_id: NodeId,
103        prev_log_index: LogIndex,
104        prev_log_term: Term,
105        leader_commit: LogIndex,
106    ) -> Self {
107        Self::new(
108            term,
109            leader_id,
110            prev_log_index,
111            prev_log_term,
112            Vec::new(),
113            leader_commit,
114        )
115    }
116
117    /// Check if this is a heartbeat (no entries)
118    pub fn is_heartbeat(&self) -> bool {
119        self.entries.is_empty()
120    }
121}
122
123/// Response to an append entries request
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct AppendEntriesResponse {
126    /// Current term, for leader to update itself
127    pub term: Term,
128    /// True if follower contained entry matching prev_log_index and prev_log_term
129    pub success: bool,
130    /// Follower's last log index (for optimization)
131    pub last_log_index: LogIndex,
132    /// Conflict index (for fast log backtracking)
133    pub conflict_index: Option<LogIndex>,
134    /// Conflict term (for fast log backtracking)
135    pub conflict_term: Option<Term>,
136}
137
138impl AppendEntriesResponse {
139    /// Create a new append entries response
140    pub fn new(
141        term: Term,
142        success: bool,
143        last_log_index: LogIndex,
144        conflict_index: Option<LogIndex>,
145        conflict_term: Option<Term>,
146    ) -> Self {
147        Self {
148            term,
149            success,
150            last_log_index,
151            conflict_index,
152            conflict_term,
153        }
154    }
155
156    /// Create a success response
157    pub fn success(term: Term, last_log_index: LogIndex) -> Self {
158        Self::new(term, true, last_log_index, None, None)
159    }
160
161    /// Create a failure response
162    pub fn failure(
163        term: Term,
164        last_log_index: LogIndex,
165        conflict_index: LogIndex,
166        conflict_term: Term,
167    ) -> Self {
168        Self::new(
169            term,
170            false,
171            last_log_index,
172            Some(conflict_index),
173            Some(conflict_term),
174        )
175    }
176
177    /// Create a rejected response (generic failure)
178    pub fn rejected(term: Term) -> Self {
179        Self::new(term, false, 0, None, None)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::log::Command;
187
188    #[test]
189    fn test_request_vote_request() {
190        let req = RequestVoteRequest::new(5, 1, 10, 3);
191        assert_eq!(req.term, 5);
192        assert_eq!(req.candidate_id, 1);
193        assert_eq!(req.last_log_index, 10);
194        assert_eq!(req.last_log_term, 3);
195    }
196
197    #[test]
198    fn test_request_vote_response() {
199        let resp = RequestVoteResponse::granted(5);
200        assert_eq!(resp.term, 5);
201        assert!(resp.vote_granted);
202
203        let resp = RequestVoteResponse::rejected(6);
204        assert_eq!(resp.term, 6);
205        assert!(!resp.vote_granted);
206    }
207
208    #[test]
209    fn test_append_entries_request_heartbeat() {
210        let req = AppendEntriesRequest::heartbeat(5, 1, 10, 3, 8);
211        assert_eq!(req.term, 5);
212        assert_eq!(req.leader_id, 1);
213        assert_eq!(req.prev_log_index, 10);
214        assert_eq!(req.prev_log_term, 3);
215        assert!(req.entries.is_empty());
216        assert_eq!(req.leader_commit, 8);
217        assert!(req.is_heartbeat());
218    }
219
220    #[test]
221    fn test_append_entries_request_with_entries() {
222        let entry = LogEntry::new(5, 11, Command::from_str("test"));
223        let req = AppendEntriesRequest::new(5, 1, 10, 3, vec![entry], 8);
224        assert!(!req.is_heartbeat());
225        assert_eq!(req.entries.len(), 1);
226    }
227
228    #[test]
229    fn test_append_entries_response_success() {
230        let resp = AppendEntriesResponse::success(5, 11);
231        assert_eq!(resp.term, 5);
232        assert!(resp.success);
233        assert_eq!(resp.last_log_index, 11);
234        assert!(resp.conflict_index.is_none());
235        assert!(resp.conflict_term.is_none());
236    }
237
238    #[test]
239    fn test_append_entries_response_failure() {
240        let resp = AppendEntriesResponse::failure(5, 9, 8, 2);
241        assert_eq!(resp.term, 5);
242        assert!(!resp.success);
243        assert_eq!(resp.last_log_index, 9);
244        assert_eq!(resp.conflict_index, Some(8));
245        assert_eq!(resp.conflict_term, Some(2));
246    }
247}