Skip to main content

nodedb_raft/
message.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3/// A single entry in the Raft log.
4///
5/// Each entry carries the term in which it was created and an opaque command
6/// payload. The state machine interprets the payload; Raft only cares about
7/// term and index for consistency.
8#[derive(
9    Debug,
10    Clone,
11    PartialEq,
12    Eq,
13    serde::Serialize,
14    serde::Deserialize,
15    rkyv::Archive,
16    rkyv::Serialize,
17    rkyv::Deserialize,
18    zerompk::ToMessagePack,
19    zerompk::FromMessagePack,
20)]
21pub struct LogEntry {
22    /// The term when this entry was received by the leader.
23    pub term: u64,
24    /// Log index (1-based, monotonically increasing).
25    pub index: u64,
26    /// Opaque command for the state machine. Empty for no-op entries
27    /// (appended by newly elected leaders per Raft paper §5.4.2).
28    pub data: Vec<u8>,
29}
30
31/// AppendEntries RPC (Raft paper Figure 2).
32///
33/// Invoked by leader to replicate log entries; also used as heartbeat
34/// (entries is empty).
35#[derive(
36    Debug,
37    Clone,
38    serde::Serialize,
39    serde::Deserialize,
40    rkyv::Archive,
41    rkyv::Serialize,
42    rkyv::Deserialize,
43    zerompk::ToMessagePack,
44    zerompk::FromMessagePack,
45)]
46pub struct AppendEntriesRequest {
47    /// Leader's term.
48    pub term: u64,
49    /// Leader's ID so followers can redirect clients.
50    pub leader_id: u64,
51    /// Index of log entry immediately preceding new ones.
52    pub prev_log_index: u64,
53    /// Term of prev_log_index entry.
54    pub prev_log_term: u64,
55    /// Log entries to store (empty for heartbeat).
56    pub entries: Vec<LogEntry>,
57    /// Leader's commit_index.
58    pub leader_commit: u64,
59    /// Raft group ID for Multi-Raft routing.
60    pub group_id: u64,
61}
62
63#[derive(
64    Debug,
65    Clone,
66    serde::Serialize,
67    serde::Deserialize,
68    rkyv::Archive,
69    rkyv::Serialize,
70    rkyv::Deserialize,
71    zerompk::ToMessagePack,
72    zerompk::FromMessagePack,
73)]
74pub struct AppendEntriesResponse {
75    /// Current term, for leader to update itself.
76    pub term: u64,
77    /// True if follower contained entry matching prev_log_index and prev_log_term.
78    pub success: bool,
79    /// Optimization: on rejection, the follower's last log index.
80    /// Allows leader to skip back faster than decrementing one-by-one.
81    pub last_log_index: u64,
82}
83
84/// RequestVote RPC (Raft paper Figure 2).
85#[derive(
86    Debug,
87    Clone,
88    serde::Serialize,
89    serde::Deserialize,
90    rkyv::Archive,
91    rkyv::Serialize,
92    rkyv::Deserialize,
93    zerompk::ToMessagePack,
94    zerompk::FromMessagePack,
95)]
96pub struct RequestVoteRequest {
97    /// Candidate's term.
98    pub term: u64,
99    /// Candidate requesting vote.
100    pub candidate_id: u64,
101    /// Index of candidate's last log entry.
102    pub last_log_index: u64,
103    /// Term of candidate's last log entry.
104    pub last_log_term: u64,
105    /// Raft group ID for Multi-Raft routing.
106    pub group_id: u64,
107}
108
109#[derive(
110    Debug,
111    Clone,
112    serde::Serialize,
113    serde::Deserialize,
114    rkyv::Archive,
115    rkyv::Serialize,
116    rkyv::Deserialize,
117    zerompk::ToMessagePack,
118    zerompk::FromMessagePack,
119)]
120pub struct RequestVoteResponse {
121    /// Current term, for candidate to update itself.
122    pub term: u64,
123    /// True means candidate received vote.
124    pub vote_granted: bool,
125}
126
127/// InstallSnapshot RPC (Raft paper Figure 13).
128///
129/// Used when a follower is too far behind for log-based catch-up.
130#[derive(
131    Debug,
132    Clone,
133    serde::Serialize,
134    serde::Deserialize,
135    rkyv::Archive,
136    rkyv::Serialize,
137    rkyv::Deserialize,
138    zerompk::ToMessagePack,
139    zerompk::FromMessagePack,
140)]
141#[msgpack(map)]
142pub struct InstallSnapshotRequest {
143    /// Leader's term.
144    pub term: u64,
145    /// Leader ID.
146    pub leader_id: u64,
147    /// The snapshot replaces all entries up through and including this index.
148    pub last_included_index: u64,
149    /// Term of last_included_index.
150    pub last_included_term: u64,
151    /// Byte offset where chunk is positioned in the snapshot file.
152    pub offset: u64,
153    /// Raw bytes of the snapshot chunk.
154    pub data: Vec<u8>,
155    /// True if this is the last chunk.
156    pub done: bool,
157    /// Raft group ID for Multi-Raft routing.
158    pub group_id: u64,
159    /// Total snapshot size in bytes. `0` means "unknown" (legacy senders or
160    /// bootstrap stubs). Receivers use this only as an advisory hint; they
161    /// must not reject chunks when `total_size == 0`.
162    #[serde(default)]
163    #[msgpack(default)]
164    pub total_size: u64,
165}
166
167#[derive(
168    Debug,
169    Clone,
170    serde::Serialize,
171    serde::Deserialize,
172    rkyv::Archive,
173    rkyv::Serialize,
174    rkyv::Deserialize,
175    zerompk::ToMessagePack,
176    zerompk::FromMessagePack,
177)]
178pub struct InstallSnapshotResponse {
179    /// Current term, for leader to update itself.
180    pub term: u64,
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    #[test]
188    fn log_entry_serde_roundtrip() {
189        let entry = LogEntry {
190            term: 5,
191            index: 42,
192            data: b"put key=val".to_vec(),
193        };
194        let json = sonic_rs::to_string(&entry).unwrap();
195        let decoded: LogEntry = sonic_rs::from_str(&json).unwrap();
196        assert_eq!(entry, decoded);
197    }
198
199    #[test]
200    fn append_entries_heartbeat() {
201        let req = AppendEntriesRequest {
202            term: 3,
203            leader_id: 1,
204            prev_log_index: 10,
205            prev_log_term: 2,
206            entries: vec![],
207            leader_commit: 8,
208            group_id: 0,
209        };
210        assert!(req.entries.is_empty());
211    }
212
213    #[test]
214    fn request_vote_serde_roundtrip() {
215        let req = RequestVoteRequest {
216            term: 7,
217            candidate_id: 2,
218            last_log_index: 100,
219            last_log_term: 6,
220            group_id: 5,
221        };
222        let json = sonic_rs::to_string(&req).unwrap();
223        let decoded: RequestVoteRequest = sonic_rs::from_str(&json).unwrap();
224        assert_eq!(req.term, decoded.term);
225        assert_eq!(req.candidate_id, decoded.candidate_id);
226    }
227}