ruvector_raft/
rpc.rs

1//! Raft RPC messages
2//!
3//! Defines the RPC message types for Raft consensus:
4//! - AppendEntries (log replication and heartbeat)
5//! - RequestVote (leader election)
6//! - InstallSnapshot (snapshot transfer)
7
8use crate::{log::LogEntry, log::Snapshot, LogIndex, NodeId, Term};
9use serde::{Deserialize, Serialize};
10
11/// AppendEntries RPC request
12///
13/// Invoked by leader to replicate log entries; also used as heartbeat
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct AppendEntriesRequest {
16    /// Leader's term
17    pub term: Term,
18
19    /// Leader's ID (so followers can redirect clients)
20    pub leader_id: NodeId,
21
22    /// Index of log entry immediately preceding new ones
23    pub prev_log_index: LogIndex,
24
25    /// Term of prevLogIndex entry
26    pub prev_log_term: Term,
27
28    /// Log entries to store (empty for heartbeat)
29    pub entries: Vec<LogEntry>,
30
31    /// Leader's commitIndex
32    pub leader_commit: LogIndex,
33}
34
35impl AppendEntriesRequest {
36    /// Create a new AppendEntries request
37    pub fn new(
38        term: Term,
39        leader_id: NodeId,
40        prev_log_index: LogIndex,
41        prev_log_term: Term,
42        entries: Vec<LogEntry>,
43        leader_commit: LogIndex,
44    ) -> Self {
45        Self {
46            term,
47            leader_id,
48            prev_log_index,
49            prev_log_term,
50            entries,
51            leader_commit,
52        }
53    }
54
55    /// Create a heartbeat (AppendEntries with no entries)
56    pub fn heartbeat(term: Term, leader_id: NodeId, leader_commit: LogIndex) -> Self {
57        Self {
58            term,
59            leader_id,
60            prev_log_index: 0,
61            prev_log_term: 0,
62            entries: Vec::new(),
63            leader_commit,
64        }
65    }
66
67    /// Check if this is a heartbeat message
68    pub fn is_heartbeat(&self) -> bool {
69        self.entries.is_empty()
70    }
71
72    /// Serialize to bytes
73    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
74        use bincode::config;
75        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
76    }
77
78    /// Deserialize from bytes
79    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
80        use bincode::config;
81        let (compat, _): (bincode::serde::Compat<Self>, _) =
82            bincode::decode_from_slice(bytes, config::standard())?;
83        Ok(compat.0)
84    }
85}
86
87/// AppendEntries RPC response
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct AppendEntriesResponse {
90    /// Current term, for leader to update itself
91    pub term: Term,
92
93    /// True if follower contained entry matching prevLogIndex and prevLogTerm
94    pub success: bool,
95
96    /// The follower's last log index (for optimization)
97    pub match_index: Option<LogIndex>,
98
99    /// Conflict information for faster log backtracking
100    pub conflict_index: Option<LogIndex>,
101    pub conflict_term: Option<Term>,
102}
103
104impl AppendEntriesResponse {
105    /// Create a successful response
106    pub fn success(term: Term, match_index: LogIndex) -> Self {
107        Self {
108            term,
109            success: true,
110            match_index: Some(match_index),
111            conflict_index: None,
112            conflict_term: None,
113        }
114    }
115
116    /// Create a failure response
117    pub fn failure(
118        term: Term,
119        conflict_index: Option<LogIndex>,
120        conflict_term: Option<Term>,
121    ) -> Self {
122        Self {
123            term,
124            success: false,
125            match_index: None,
126            conflict_index,
127            conflict_term,
128        }
129    }
130
131    /// Serialize to bytes
132    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
133        use bincode::config;
134        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
135    }
136
137    /// Deserialize from bytes
138    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
139        use bincode::config;
140        let (compat, _): (bincode::serde::Compat<Self>, _) =
141            bincode::decode_from_slice(bytes, config::standard())?;
142        Ok(compat.0)
143    }
144}
145
146/// RequestVote RPC request
147///
148/// Invoked by candidates to gather votes
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct RequestVoteRequest {
151    /// Candidate's term
152    pub term: Term,
153
154    /// Candidate requesting vote
155    pub candidate_id: NodeId,
156
157    /// Index of candidate's last log entry
158    pub last_log_index: LogIndex,
159
160    /// Term of candidate's last log entry
161    pub last_log_term: Term,
162}
163
164impl RequestVoteRequest {
165    /// Create a new RequestVote request
166    pub fn new(
167        term: Term,
168        candidate_id: NodeId,
169        last_log_index: LogIndex,
170        last_log_term: Term,
171    ) -> Self {
172        Self {
173            term,
174            candidate_id,
175            last_log_index,
176            last_log_term,
177        }
178    }
179
180    /// Serialize to bytes
181    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
182        use bincode::config;
183        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
184    }
185
186    /// Deserialize from bytes
187    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
188        use bincode::config;
189        let (compat, _): (bincode::serde::Compat<Self>, _) =
190            bincode::decode_from_slice(bytes, config::standard())?;
191        Ok(compat.0)
192    }
193}
194
195/// RequestVote RPC response
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct RequestVoteResponse {
198    /// Current term, for candidate to update itself
199    pub term: Term,
200
201    /// True means candidate received vote
202    pub vote_granted: bool,
203}
204
205impl RequestVoteResponse {
206    /// Create a vote granted response
207    pub fn granted(term: Term) -> Self {
208        Self {
209            term,
210            vote_granted: true,
211        }
212    }
213
214    /// Create a vote denied response
215    pub fn denied(term: Term) -> Self {
216        Self {
217            term,
218            vote_granted: false,
219        }
220    }
221
222    /// Serialize to bytes
223    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
224        use bincode::config;
225        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
226    }
227
228    /// Deserialize from bytes
229    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
230        use bincode::config;
231        let (compat, _): (bincode::serde::Compat<Self>, _) =
232            bincode::decode_from_slice(bytes, config::standard())?;
233        Ok(compat.0)
234    }
235}
236
237/// InstallSnapshot RPC request
238///
239/// Invoked by leader to send chunks of a snapshot to a follower
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct InstallSnapshotRequest {
242    /// Leader's term
243    pub term: Term,
244
245    /// Leader's ID (so follower can redirect clients)
246    pub leader_id: NodeId,
247
248    /// The snapshot replaces all entries up through and including this index
249    pub last_included_index: LogIndex,
250
251    /// Term of lastIncludedIndex
252    pub last_included_term: Term,
253
254    /// Byte offset where chunk is positioned in the snapshot file
255    pub offset: u64,
256
257    /// Raw bytes of the snapshot chunk, starting at offset
258    pub data: Vec<u8>,
259
260    /// True if this is the last chunk
261    pub done: bool,
262}
263
264impl InstallSnapshotRequest {
265    /// Create a new InstallSnapshot request
266    pub fn new(
267        term: Term,
268        leader_id: NodeId,
269        snapshot: Snapshot,
270        offset: u64,
271        chunk_size: usize,
272    ) -> Self {
273        let data_len = snapshot.data.len();
274        let chunk_end = std::cmp::min(offset as usize + chunk_size, data_len);
275        let chunk = snapshot.data[offset as usize..chunk_end].to_vec();
276        let done = chunk_end >= data_len;
277
278        Self {
279            term,
280            leader_id,
281            last_included_index: snapshot.last_included_index,
282            last_included_term: snapshot.last_included_term,
283            offset,
284            data: chunk,
285            done,
286        }
287    }
288
289    /// Serialize to bytes
290    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
291        use bincode::config;
292        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
293    }
294
295    /// Deserialize from bytes
296    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
297        use bincode::config;
298        let (compat, _): (bincode::serde::Compat<Self>, _) =
299            bincode::decode_from_slice(bytes, config::standard())?;
300        Ok(compat.0)
301    }
302}
303
304/// InstallSnapshot RPC response
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct InstallSnapshotResponse {
307    /// Current term, for leader to update itself
308    pub term: Term,
309
310    /// True if snapshot was successfully installed
311    pub success: bool,
312
313    /// The byte offset for the next chunk (for resume)
314    pub next_offset: Option<u64>,
315}
316
317impl InstallSnapshotResponse {
318    /// Create a successful response
319    pub fn success(term: Term, next_offset: Option<u64>) -> Self {
320        Self {
321            term,
322            success: true,
323            next_offset,
324        }
325    }
326
327    /// Create a failure response
328    pub fn failure(term: Term) -> Self {
329        Self {
330            term,
331            success: false,
332            next_offset: None,
333        }
334    }
335
336    /// Serialize to bytes
337    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
338        use bincode::config;
339        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
340    }
341
342    /// Deserialize from bytes
343    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
344        use bincode::config;
345        let (compat, _): (bincode::serde::Compat<Self>, _) =
346            bincode::decode_from_slice(bytes, config::standard())?;
347        Ok(compat.0)
348    }
349}
350
351/// RPC message envelope
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub enum RaftMessage {
354    AppendEntriesRequest(AppendEntriesRequest),
355    AppendEntriesResponse(AppendEntriesResponse),
356    RequestVoteRequest(RequestVoteRequest),
357    RequestVoteResponse(RequestVoteResponse),
358    InstallSnapshotRequest(InstallSnapshotRequest),
359    InstallSnapshotResponse(InstallSnapshotResponse),
360}
361
362impl RaftMessage {
363    /// Get the term from the message
364    pub fn term(&self) -> Term {
365        match self {
366            RaftMessage::AppendEntriesRequest(req) => req.term,
367            RaftMessage::AppendEntriesResponse(resp) => resp.term,
368            RaftMessage::RequestVoteRequest(req) => req.term,
369            RaftMessage::RequestVoteResponse(resp) => resp.term,
370            RaftMessage::InstallSnapshotRequest(req) => req.term,
371            RaftMessage::InstallSnapshotResponse(resp) => resp.term,
372        }
373    }
374
375    /// Serialize to bytes
376    pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError> {
377        use bincode::config;
378        bincode::encode_to_vec(bincode::serde::Compat(self), config::standard())
379    }
380
381    /// Deserialize from bytes
382    pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError> {
383        use bincode::config;
384        let (compat, _): (bincode::serde::Compat<Self>, _) =
385            bincode::decode_from_slice(bytes, config::standard())?;
386        Ok(compat.0)
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_append_entries_heartbeat() {
396        let req = AppendEntriesRequest::heartbeat(1, "leader".to_string(), 10);
397        assert!(req.is_heartbeat());
398        assert_eq!(req.entries.len(), 0);
399    }
400
401    #[test]
402    fn test_append_entries_serialization() {
403        let req = AppendEntriesRequest::new(1, "leader".to_string(), 10, 1, vec![], 10);
404
405        let bytes = req.to_bytes().unwrap();
406        let decoded = AppendEntriesRequest::from_bytes(&bytes).unwrap();
407
408        assert_eq!(req.term, decoded.term);
409        assert_eq!(req.leader_id, decoded.leader_id);
410    }
411
412    #[test]
413    fn test_request_vote_serialization() {
414        let req = RequestVoteRequest::new(2, "candidate".to_string(), 15, 2);
415
416        let bytes = req.to_bytes().unwrap();
417        let decoded = RequestVoteRequest::from_bytes(&bytes).unwrap();
418
419        assert_eq!(req.term, decoded.term);
420        assert_eq!(req.candidate_id, decoded.candidate_id);
421    }
422
423    #[test]
424    fn test_response_types() {
425        let success = AppendEntriesResponse::success(1, 10);
426        assert!(success.success);
427        assert_eq!(success.match_index, Some(10));
428
429        let failure = AppendEntriesResponse::failure(1, Some(5), Some(1));
430        assert!(!failure.success);
431        assert_eq!(failure.conflict_index, Some(5));
432    }
433
434    #[test]
435    fn test_vote_responses() {
436        let granted = RequestVoteResponse::granted(1);
437        assert!(granted.vote_granted);
438
439        let denied = RequestVoteResponse::denied(1);
440        assert!(!denied.vote_granted);
441    }
442}