raft_io/message.rs
1//! The RPC messages nodes exchange.
2//!
3//! Raft defines two RPCs. [`RequestVote`] drives elections; [`AppendEntries`]
4//! replicates the log and doubles as the leader's heartbeat. Each has a reply.
5//! The protocol core never sends these itself — it emits
6//! [`Action::Send`](crate::Action::Send) carrying a [`Message`], and the caller
7//! delivers it through a [`RaftTransport`](crate::RaftTransport). Keeping the
8//! messages as plain data is what lets a test harness route them in memory and,
9//! later, a framing layer put them on a wire.
10//!
11//! In `v0.2`, [`AppendEntries`] is used only as an empty heartbeat that keeps a
12//! follower from starting a needless election. Carrying real entries — the
13//! replication pipeline — arrives in `v0.3`; the fields are already present so
14//! the wire shape does not change underneath callers.
15
16use crate::types::{Index, LogEntry, NodeId, Snapshot, Term};
17
18/// A candidate's request for a vote in an election.
19///
20/// Sent by a [`Candidate`](crate::Role::Candidate) to every peer when it starts
21/// an election. A recipient grants its vote only if it has not already voted in
22/// this term and the candidate's log is at least as up to date as its own — the
23/// election restriction that keeps a node missing committed entries from
24/// becoming leader.
25///
26/// # Examples
27///
28/// ```
29/// use raft_io::RequestVote;
30///
31/// let rv = RequestVote {
32/// term: 4, candidate: 2, last_log_index: 9, last_log_term: 3, force: false,
33/// };
34/// assert_eq!(rv.candidate, 2);
35/// ```
36#[derive(Clone, Debug, PartialEq, Eq)]
37#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
38pub struct RequestVote {
39 /// The candidate's term.
40 pub term: Term,
41 /// The candidate requesting the vote.
42 pub candidate: NodeId,
43 /// Index of the candidate's last log entry.
44 pub last_log_index: Index,
45 /// Term of the candidate's last log entry.
46 pub last_log_term: Term,
47 /// A forced election, requested by the current leader as part of a
48 /// [leadership transfer](crate::Event::TransferLeadership). A recipient
49 /// honours it even within the leader-stickiness window, so the hand-off is
50 /// not blocked by its own loyalty to the departing leader.
51 pub force: bool,
52}
53
54/// A candidate's *pre-vote* probe, sent before it commits to a real election.
55///
56/// Pre-voting (Raft thesis §9.6) is a disruption guard. Before a node increments
57/// its term and campaigns for real, it asks its peers whether they *would* vote
58/// for it at the next term — without bumping anyone's term. A peer grants only if
59/// it has no active leader and the candidate's log is up to date (the same
60/// election restriction a real vote applies). The candidate runs a real
61/// [`RequestVote`] election only once a quorum of pre-votes says yes.
62///
63/// The point is that a node partitioned away from the cluster never collects a
64/// pre-vote majority, so it never inflates its term. When it rejoins it does not
65/// force the established leader to step down, which is the disruption a plain
66/// election would cause. Unlike [`RequestVote`], a pre-vote changes no persistent
67/// state on either side.
68///
69/// # Examples
70///
71/// ```
72/// use raft_io::PreVote;
73///
74/// // The `term` is the *hypothetical* term the candidate would campaign at —
75/// // one past its current term — not a term it has adopted.
76/// let pv = PreVote { term: 5, candidate: 2, last_log_index: 9, last_log_term: 3 };
77/// assert_eq!(pv.candidate, 2);
78/// ```
79#[derive(Clone, Debug, PartialEq, Eq)]
80#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
81pub struct PreVote {
82 /// The hypothetical term the candidate would campaign at — one past its
83 /// current term. It is *not* a term the candidate has adopted; a recipient
84 /// neither stores it nor steps down for it.
85 pub term: Term,
86 /// The candidate seeking the pre-vote.
87 pub candidate: NodeId,
88 /// Index of the candidate's last log entry.
89 pub last_log_index: Index,
90 /// Term of the candidate's last log entry.
91 pub last_log_term: Term,
92}
93
94/// A peer's response to a [`PreVote`].
95///
96/// `term` is the responder's *current* term, unchanged by the pre-vote. If it
97/// exceeds the pre-candidate's term, the pre-candidate has fallen behind and
98/// abandons the round; otherwise `vote_granted` tells it whether this peer would
99/// support a real election. None of this touches persistent state.
100///
101/// # Examples
102///
103/// ```
104/// use raft_io::PreVoteReply;
105///
106/// let reply = PreVoteReply { term: 4, vote_granted: true, from: 3 };
107/// assert!(reply.vote_granted);
108/// ```
109#[derive(Clone, Debug, PartialEq, Eq)]
110#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
111pub struct PreVoteReply {
112 /// The responder's current term, unchanged by the pre-vote.
113 pub term: Term,
114 /// Whether the responder would grant a real vote under these conditions.
115 pub vote_granted: bool,
116 /// The node that produced this reply.
117 pub from: NodeId,
118}
119
120/// A peer's response to a [`RequestVote`].
121///
122/// `from` names the responder so the candidate can count distinct votes without
123/// depending on transport-level addressing. If `term` exceeds the candidate's
124/// term, the candidate steps down instead of counting the vote.
125///
126/// # Examples
127///
128/// ```
129/// use raft_io::RequestVoteReply;
130///
131/// let reply = RequestVoteReply { term: 4, vote_granted: true, from: 3 };
132/// assert!(reply.vote_granted);
133/// ```
134#[derive(Clone, Debug, PartialEq, Eq)]
135#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
136pub struct RequestVoteReply {
137 /// The responder's current term, for the candidate to update itself.
138 pub term: Term,
139 /// Whether the responder granted its vote.
140 pub vote_granted: bool,
141 /// The node that produced this reply.
142 pub from: NodeId,
143}
144
145/// The leader's replicate-and-heartbeat RPC.
146///
147/// The leader sends this to each follower. With an empty
148/// [`entries`](AppendEntries::entries) list it is a pure heartbeat that asserts
149/// leadership and resets the follower's election timer; with entries it
150/// replicates the log (from `v0.3`). The `prev_log_index` / `prev_log_term`
151/// pair lets the follower verify its log matches the leader's up to that point
152/// before accepting anything new.
153///
154/// # Examples
155///
156/// ```
157/// use raft_io::AppendEntries;
158///
159/// // An empty heartbeat for term 4 from node 1.
160/// let hb = AppendEntries {
161/// term: 4,
162/// leader: 1,
163/// prev_log_index: 9,
164/// prev_log_term: 3,
165/// entries: Vec::new(),
166/// leader_commit: 7,
167/// };
168/// assert!(hb.entries.is_empty());
169/// ```
170#[derive(Clone, Debug, PartialEq, Eq)]
171#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
172pub struct AppendEntries {
173 /// The leader's term.
174 pub term: Term,
175 /// The leader sending the RPC, so followers can record it.
176 pub leader: NodeId,
177 /// Index of the log entry immediately preceding the new ones.
178 pub prev_log_index: Index,
179 /// Term of the entry at `prev_log_index`.
180 pub prev_log_term: Term,
181 /// Entries to store (empty for a heartbeat). Replication uses this in `v0.3`.
182 pub entries: Vec<LogEntry>,
183 /// The leader's commit index, so followers can advance their own.
184 pub leader_commit: Index,
185}
186
187/// A follower's response to an [`AppendEntries`].
188///
189/// `success` is `true` when the follower's log matched at `prev_log_index` and
190/// it accepted the RPC. `match_index` reports the highest log index the
191/// follower now agrees on, which the leader uses to track replication progress.
192///
193/// On a rejection, the `conflict_*` fields let the leader skip the follower's
194/// `next_index` back by a whole term in one round trip instead of decrementing
195/// one entry at a time (the fast-backtracking optimisation from the Raft thesis,
196/// §5.3). They are `0` on success and ignored.
197///
198/// # Examples
199///
200/// ```
201/// use raft_io::AppendEntriesReply;
202///
203/// let ok = AppendEntriesReply {
204/// term: 4,
205/// success: true,
206/// from: 2,
207/// match_index: 9,
208/// conflict_index: 0,
209/// conflict_term: 0,
210/// };
211/// assert!(ok.success);
212/// ```
213#[derive(Clone, Debug, PartialEq, Eq)]
214#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
215pub struct AppendEntriesReply {
216 /// The follower's current term, for the leader to update itself.
217 pub term: Term,
218 /// Whether the follower accepted the RPC.
219 pub success: bool,
220 /// The node that produced this reply.
221 pub from: NodeId,
222 /// Highest log index the follower now matches with the leader.
223 pub match_index: Index,
224 /// On rejection, the index the leader should probe next (the follower's
225 /// first index of `conflict_term`, or its log length plus one when the log
226 /// is simply too short). `0` on success.
227 pub conflict_index: Index,
228 /// On rejection, the term of the follower's entry at `prev_log_index`, or
229 /// `0` when the follower has no entry there. `0` on success.
230 pub conflict_term: Term,
231}
232
233/// A leader's transfer of a [`Snapshot`] to a follower too far behind to
234/// replicate entry by entry.
235///
236/// When a follower's next required entry has already been compacted out of the
237/// leader's log, the leader sends this instead of an [`AppendEntries`]. The
238/// follower installs the snapshot — replacing its state through
239/// `snapshot.index` — then resumes normal replication from the tail.
240///
241/// # Examples
242///
243/// ```
244/// use raft_io::{InstallSnapshot, Snapshot};
245///
246/// let rpc = InstallSnapshot { term: 5, leader: 1, snapshot: Snapshot::new(10, 3, vec![]) };
247/// assert_eq!(rpc.snapshot.index, 10);
248/// ```
249#[derive(Clone, Debug, PartialEq, Eq)]
250#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
251pub struct InstallSnapshot {
252 /// The leader's term.
253 pub term: Term,
254 /// The leader sending the snapshot.
255 pub leader: NodeId,
256 /// The snapshot to install.
257 pub snapshot: Snapshot,
258}
259
260/// A follower's response to an [`InstallSnapshot`].
261///
262/// `last_index` is the snapshot's index the follower has now installed, which
263/// the leader uses to advance that follower's replication progress.
264///
265/// # Examples
266///
267/// ```
268/// use raft_io::InstallSnapshotReply;
269///
270/// let reply = InstallSnapshotReply { term: 5, from: 2, last_index: 10 };
271/// assert_eq!(reply.last_index, 10);
272/// ```
273#[derive(Clone, Debug, PartialEq, Eq)]
274#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
275pub struct InstallSnapshotReply {
276 /// The follower's current term, for the leader to update itself.
277 pub term: Term,
278 /// The node that produced this reply.
279 pub from: NodeId,
280 /// The snapshot index the follower has installed.
281 pub last_index: Index,
282}
283
284/// A leader's signal telling `target` to start an election immediately.
285///
286/// Sent during a [leadership transfer](crate::Event::TransferLeadership): once the
287/// target is fully caught up, the leader sends this so the target campaigns at
288/// once instead of waiting out its election timeout, taking over with minimal
289/// disruption.
290///
291/// # Examples
292///
293/// ```
294/// use raft_io::TimeoutNow;
295///
296/// let rpc = TimeoutNow { term: 5, leader: 1 };
297/// assert_eq!(rpc.leader, 1);
298/// ```
299#[derive(Clone, Debug, PartialEq, Eq)]
300#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
301pub struct TimeoutNow {
302 /// The leader's term.
303 pub term: Term,
304 /// The leader handing off leadership.
305 pub leader: NodeId,
306}
307
308/// Any message a node can send or receive.
309///
310/// Wraps the RPCs and their replies. The enum is
311/// [`#[non_exhaustive]`](https://doc.rust-lang.org/reference/attributes/type_system.html#the-non_exhaustive-attribute):
312/// future versions may add variants, so a `match` over a `Message` must include
313/// a wildcard arm.
314///
315/// # Examples
316///
317/// ```
318/// use raft_io::{Message, RequestVote};
319///
320/// let msg = Message::RequestVote(RequestVote {
321/// term: 1,
322/// candidate: 1,
323/// last_log_index: 0,
324/// last_log_term: 0,
325/// force: false,
326/// });
327/// match msg {
328/// Message::RequestVote(rv) => assert_eq!(rv.term, 1),
329/// _ => unreachable!(),
330/// }
331/// ```
332#[non_exhaustive]
333#[derive(Clone, Debug, PartialEq, Eq)]
334#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
335pub enum Message {
336 /// A candidate is probing for support before a real election.
337 PreVote(PreVote),
338 /// A peer is answering a pre-vote probe.
339 PreVoteReply(PreVoteReply),
340 /// A candidate is asking for a vote.
341 RequestVote(RequestVote),
342 /// A peer is answering a vote request.
343 RequestVoteReply(RequestVoteReply),
344 /// A leader is replicating entries or sending a heartbeat.
345 AppendEntries(AppendEntries),
346 /// A follower is answering an append.
347 AppendEntriesReply(AppendEntriesReply),
348 /// A leader is shipping a snapshot to a far-behind follower.
349 InstallSnapshot(InstallSnapshot),
350 /// A follower is acknowledging an installed snapshot.
351 InstallSnapshotReply(InstallSnapshotReply),
352 /// A leader is handing off leadership, telling the target to campaign now.
353 TimeoutNow(TimeoutNow),
354}
355
356impl Message {
357 /// Returns the term carried by the message, whatever its variant.
358 ///
359 /// The protocol checks the term of every inbound message first — a higher
360 /// term forces the node to step down — so a single accessor avoids matching
361 /// at each call site.
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use raft_io::{AppendEntriesReply, Message};
367 ///
368 /// let m = Message::AppendEntriesReply(AppendEntriesReply {
369 /// term: 5,
370 /// success: false,
371 /// from: 2,
372 /// match_index: 0,
373 /// conflict_index: 1,
374 /// conflict_term: 0,
375 /// });
376 /// assert_eq!(m.term(), 5);
377 /// ```
378 #[inline]
379 #[must_use]
380 pub fn term(&self) -> Term {
381 match self {
382 Self::PreVote(m) => m.term,
383 Self::PreVoteReply(m) => m.term,
384 Self::RequestVote(m) => m.term,
385 Self::RequestVoteReply(m) => m.term,
386 Self::AppendEntries(m) => m.term,
387 Self::AppendEntriesReply(m) => m.term,
388 Self::InstallSnapshot(m) => m.term,
389 Self::InstallSnapshotReply(m) => m.term,
390 Self::TimeoutNow(m) => m.term,
391 }
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_message_term_reads_each_variant() {
401 assert_eq!(
402 Message::RequestVote(RequestVote {
403 term: 1,
404 candidate: 1,
405 last_log_index: 0,
406 last_log_term: 0,
407 force: false,
408 })
409 .term(),
410 1
411 );
412 assert_eq!(
413 Message::RequestVoteReply(RequestVoteReply {
414 term: 2,
415 vote_granted: true,
416 from: 1
417 })
418 .term(),
419 2
420 );
421 assert_eq!(
422 Message::AppendEntries(AppendEntries {
423 term: 3,
424 leader: 1,
425 prev_log_index: 0,
426 prev_log_term: 0,
427 entries: Vec::new(),
428 leader_commit: 0,
429 })
430 .term(),
431 3
432 );
433 assert_eq!(
434 Message::AppendEntriesReply(AppendEntriesReply {
435 term: 4,
436 success: true,
437 from: 1,
438 match_index: 0,
439 conflict_index: 0,
440 conflict_term: 0,
441 })
442 .term(),
443 4
444 );
445 }
446}