Skip to main content

raft_io/
message.rs

1//! The RPC messages nodes exchange.
2//!
3//! Raft defines two RPCs. [`RequestVote`] drives elections; [`AppendEntries`]
4//! replicates the log and doubles as the leader's heartbeat. Each has a reply.
5//! The protocol core never sends these itself — it emits
6//! [`Action::Send`](crate::Action::Send) carrying a [`Message`], and the caller
7//! delivers it through a [`RaftTransport`](crate::RaftTransport). Keeping the
8//! messages as plain data is what lets a test harness route them in memory and,
9//! later, a framing layer put them on a wire.
10//!
11//! In `v0.2`, [`AppendEntries`] is used only as an empty heartbeat that keeps a
12//! follower from starting a needless election. Carrying real entries — the
13//! replication pipeline — arrives in `v0.3`; the fields are already present so
14//! the wire shape does not change underneath callers.
15
16use crate::types::{Index, LogEntry, NodeId, Snapshot, Term};
17
18/// A candidate's request for a vote in an election.
19///
20/// Sent by a [`Candidate`](crate::Role::Candidate) to every peer when it starts
21/// an election. A recipient grants its vote only if it has not already voted in
22/// this term and the candidate's log is at least as up to date as its own — the
23/// election restriction that keeps a node missing committed entries from
24/// becoming leader.
25///
26/// # Examples
27///
28/// ```
29/// use raft_io::RequestVote;
30///
31/// let rv = RequestVote {
32///     term: 4, candidate: 2, last_log_index: 9, last_log_term: 3, force: false,
33/// };
34/// assert_eq!(rv.candidate, 2);
35/// ```
36#[derive(Clone, Debug, PartialEq, Eq)]
37#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
38pub struct RequestVote {
39    /// The candidate's term.
40    pub term: Term,
41    /// The candidate requesting the vote.
42    pub candidate: NodeId,
43    /// Index of the candidate's last log entry.
44    pub last_log_index: Index,
45    /// Term of the candidate's last log entry.
46    pub last_log_term: Term,
47    /// A forced election, requested by the current leader as part of a
48    /// [leadership transfer](crate::Event::TransferLeadership). A recipient
49    /// honours it even within the leader-stickiness window, so the hand-off is
50    /// not blocked by its own loyalty to the departing leader.
51    pub force: bool,
52}
53
54/// A candidate's *pre-vote* probe, sent before it commits to a real election.
55///
56/// Pre-voting (Raft thesis §9.6) is a disruption guard. Before a node increments
57/// its term and campaigns for real, it asks its peers whether they *would* vote
58/// for it at the next term — without bumping anyone's term. A peer grants only if
59/// it has no active leader and the candidate's log is up to date (the same
60/// election restriction a real vote applies). The candidate runs a real
61/// [`RequestVote`] election only once a quorum of pre-votes says yes.
62///
63/// The point is that a node partitioned away from the cluster never collects a
64/// pre-vote majority, so it never inflates its term. When it rejoins it does not
65/// force the established leader to step down, which is the disruption a plain
66/// election would cause. Unlike [`RequestVote`], a pre-vote changes no persistent
67/// state on either side.
68///
69/// # Examples
70///
71/// ```
72/// use raft_io::PreVote;
73///
74/// // The `term` is the *hypothetical* term the candidate would campaign at —
75/// // one past its current term — not a term it has adopted.
76/// let pv = PreVote { term: 5, candidate: 2, last_log_index: 9, last_log_term: 3 };
77/// assert_eq!(pv.candidate, 2);
78/// ```
79#[derive(Clone, Debug, PartialEq, Eq)]
80#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
81pub struct PreVote {
82    /// The hypothetical term the candidate would campaign at — one past its
83    /// current term. It is *not* a term the candidate has adopted; a recipient
84    /// neither stores it nor steps down for it.
85    pub term: Term,
86    /// The candidate seeking the pre-vote.
87    pub candidate: NodeId,
88    /// Index of the candidate's last log entry.
89    pub last_log_index: Index,
90    /// Term of the candidate's last log entry.
91    pub last_log_term: Term,
92}
93
94/// A peer's response to a [`PreVote`].
95///
96/// `term` is the responder's *current* term, unchanged by the pre-vote. If it
97/// exceeds the pre-candidate's term, the pre-candidate has fallen behind and
98/// abandons the round; otherwise `vote_granted` tells it whether this peer would
99/// support a real election. None of this touches persistent state.
100///
101/// # Examples
102///
103/// ```
104/// use raft_io::PreVoteReply;
105///
106/// let reply = PreVoteReply { term: 4, vote_granted: true, from: 3 };
107/// assert!(reply.vote_granted);
108/// ```
109#[derive(Clone, Debug, PartialEq, Eq)]
110#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
111pub struct PreVoteReply {
112    /// The responder's current term, unchanged by the pre-vote.
113    pub term: Term,
114    /// Whether the responder would grant a real vote under these conditions.
115    pub vote_granted: bool,
116    /// The node that produced this reply.
117    pub from: NodeId,
118}
119
120/// A peer's response to a [`RequestVote`].
121///
122/// `from` names the responder so the candidate can count distinct votes without
123/// depending on transport-level addressing. If `term` exceeds the candidate's
124/// term, the candidate steps down instead of counting the vote.
125///
126/// # Examples
127///
128/// ```
129/// use raft_io::RequestVoteReply;
130///
131/// let reply = RequestVoteReply { term: 4, vote_granted: true, from: 3 };
132/// assert!(reply.vote_granted);
133/// ```
134#[derive(Clone, Debug, PartialEq, Eq)]
135#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
136pub struct RequestVoteReply {
137    /// The responder's current term, for the candidate to update itself.
138    pub term: Term,
139    /// Whether the responder granted its vote.
140    pub vote_granted: bool,
141    /// The node that produced this reply.
142    pub from: NodeId,
143}
144
145/// The leader's replicate-and-heartbeat RPC.
146///
147/// The leader sends this to each follower. With an empty
148/// [`entries`](AppendEntries::entries) list it is a pure heartbeat that asserts
149/// leadership and resets the follower's election timer; with entries it
150/// replicates the log (from `v0.3`). The `prev_log_index` / `prev_log_term`
151/// pair lets the follower verify its log matches the leader's up to that point
152/// before accepting anything new.
153///
154/// # Examples
155///
156/// ```
157/// use raft_io::AppendEntries;
158///
159/// // An empty heartbeat for term 4 from node 1.
160/// let hb = AppendEntries {
161///     term: 4,
162///     leader: 1,
163///     prev_log_index: 9,
164///     prev_log_term: 3,
165///     entries: Vec::new(),
166///     leader_commit: 7,
167/// };
168/// assert!(hb.entries.is_empty());
169/// ```
170#[derive(Clone, Debug, PartialEq, Eq)]
171#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
172pub struct AppendEntries {
173    /// The leader's term.
174    pub term: Term,
175    /// The leader sending the RPC, so followers can record it.
176    pub leader: NodeId,
177    /// Index of the log entry immediately preceding the new ones.
178    pub prev_log_index: Index,
179    /// Term of the entry at `prev_log_index`.
180    pub prev_log_term: Term,
181    /// Entries to store (empty for a heartbeat). Replication uses this in `v0.3`.
182    pub entries: Vec<LogEntry>,
183    /// The leader's commit index, so followers can advance their own.
184    pub leader_commit: Index,
185}
186
187/// A follower's response to an [`AppendEntries`].
188///
189/// `success` is `true` when the follower's log matched at `prev_log_index` and
190/// it accepted the RPC. `match_index` reports the highest log index the
191/// follower now agrees on, which the leader uses to track replication progress.
192///
193/// On a rejection, the `conflict_*` fields let the leader skip the follower's
194/// `next_index` back by a whole term in one round trip instead of decrementing
195/// one entry at a time (the fast-backtracking optimisation from the Raft thesis,
196/// §5.3). They are `0` on success and ignored.
197///
198/// # Examples
199///
200/// ```
201/// use raft_io::AppendEntriesReply;
202///
203/// let ok = AppendEntriesReply {
204///     term: 4,
205///     success: true,
206///     from: 2,
207///     match_index: 9,
208///     conflict_index: 0,
209///     conflict_term: 0,
210/// };
211/// assert!(ok.success);
212/// ```
213#[derive(Clone, Debug, PartialEq, Eq)]
214#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
215pub struct AppendEntriesReply {
216    /// The follower's current term, for the leader to update itself.
217    pub term: Term,
218    /// Whether the follower accepted the RPC.
219    pub success: bool,
220    /// The node that produced this reply.
221    pub from: NodeId,
222    /// Highest log index the follower now matches with the leader.
223    pub match_index: Index,
224    /// On rejection, the index the leader should probe next (the follower's
225    /// first index of `conflict_term`, or its log length plus one when the log
226    /// is simply too short). `0` on success.
227    pub conflict_index: Index,
228    /// On rejection, the term of the follower's entry at `prev_log_index`, or
229    /// `0` when the follower has no entry there. `0` on success.
230    pub conflict_term: Term,
231}
232
233/// A leader's transfer of a [`Snapshot`] to a follower too far behind to
234/// replicate entry by entry.
235///
236/// When a follower's next required entry has already been compacted out of the
237/// leader's log, the leader sends this instead of an [`AppendEntries`]. The
238/// follower installs the snapshot — replacing its state through
239/// `snapshot.index` — then resumes normal replication from the tail.
240///
241/// # Examples
242///
243/// ```
244/// use raft_io::{InstallSnapshot, Snapshot};
245///
246/// let rpc = InstallSnapshot { term: 5, leader: 1, snapshot: Snapshot::new(10, 3, vec![]) };
247/// assert_eq!(rpc.snapshot.index, 10);
248/// ```
249#[derive(Clone, Debug, PartialEq, Eq)]
250#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
251pub struct InstallSnapshot {
252    /// The leader's term.
253    pub term: Term,
254    /// The leader sending the snapshot.
255    pub leader: NodeId,
256    /// The snapshot to install.
257    pub snapshot: Snapshot,
258}
259
260/// A follower's response to an [`InstallSnapshot`].
261///
262/// `last_index` is the snapshot's index the follower has now installed, which
263/// the leader uses to advance that follower's replication progress.
264///
265/// # Examples
266///
267/// ```
268/// use raft_io::InstallSnapshotReply;
269///
270/// let reply = InstallSnapshotReply { term: 5, from: 2, last_index: 10 };
271/// assert_eq!(reply.last_index, 10);
272/// ```
273#[derive(Clone, Debug, PartialEq, Eq)]
274#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
275pub struct InstallSnapshotReply {
276    /// The follower's current term, for the leader to update itself.
277    pub term: Term,
278    /// The node that produced this reply.
279    pub from: NodeId,
280    /// The snapshot index the follower has installed.
281    pub last_index: Index,
282}
283
284/// A leader's signal telling `target` to start an election immediately.
285///
286/// Sent during a [leadership transfer](crate::Event::TransferLeadership): once the
287/// target is fully caught up, the leader sends this so the target campaigns at
288/// once instead of waiting out its election timeout, taking over with minimal
289/// disruption.
290///
291/// # Examples
292///
293/// ```
294/// use raft_io::TimeoutNow;
295///
296/// let rpc = TimeoutNow { term: 5, leader: 1 };
297/// assert_eq!(rpc.leader, 1);
298/// ```
299#[derive(Clone, Debug, PartialEq, Eq)]
300#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
301pub struct TimeoutNow {
302    /// The leader's term.
303    pub term: Term,
304    /// The leader handing off leadership.
305    pub leader: NodeId,
306}
307
308/// Any message a node can send or receive.
309///
310/// Wraps the RPCs and their replies. The enum is
311/// [`#[non_exhaustive]`](https://doc.rust-lang.org/reference/attributes/type_system.html#the-non_exhaustive-attribute):
312/// future versions may add variants, so a `match` over a `Message` must include
313/// a wildcard arm.
314///
315/// # Examples
316///
317/// ```
318/// use raft_io::{Message, RequestVote};
319///
320/// let msg = Message::RequestVote(RequestVote {
321///     term: 1,
322///     candidate: 1,
323///     last_log_index: 0,
324///     last_log_term: 0,
325///     force: false,
326/// });
327/// match msg {
328///     Message::RequestVote(rv) => assert_eq!(rv.term, 1),
329///     _ => unreachable!(),
330/// }
331/// ```
332#[non_exhaustive]
333#[derive(Clone, Debug, PartialEq, Eq)]
334#[cfg_attr(feature = "framing", derive(pack_io::Serialize, pack_io::Deserialize))]
335pub enum Message {
336    /// A candidate is probing for support before a real election.
337    PreVote(PreVote),
338    /// A peer is answering a pre-vote probe.
339    PreVoteReply(PreVoteReply),
340    /// A candidate is asking for a vote.
341    RequestVote(RequestVote),
342    /// A peer is answering a vote request.
343    RequestVoteReply(RequestVoteReply),
344    /// A leader is replicating entries or sending a heartbeat.
345    AppendEntries(AppendEntries),
346    /// A follower is answering an append.
347    AppendEntriesReply(AppendEntriesReply),
348    /// A leader is shipping a snapshot to a far-behind follower.
349    InstallSnapshot(InstallSnapshot),
350    /// A follower is acknowledging an installed snapshot.
351    InstallSnapshotReply(InstallSnapshotReply),
352    /// A leader is handing off leadership, telling the target to campaign now.
353    TimeoutNow(TimeoutNow),
354}
355
356impl Message {
357    /// Returns the term carried by the message, whatever its variant.
358    ///
359    /// The protocol checks the term of every inbound message first — a higher
360    /// term forces the node to step down — so a single accessor avoids matching
361    /// at each call site.
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use raft_io::{AppendEntriesReply, Message};
367    ///
368    /// let m = Message::AppendEntriesReply(AppendEntriesReply {
369    ///     term: 5,
370    ///     success: false,
371    ///     from: 2,
372    ///     match_index: 0,
373    ///     conflict_index: 1,
374    ///     conflict_term: 0,
375    /// });
376    /// assert_eq!(m.term(), 5);
377    /// ```
378    #[inline]
379    #[must_use]
380    pub fn term(&self) -> Term {
381        match self {
382            Self::PreVote(m) => m.term,
383            Self::PreVoteReply(m) => m.term,
384            Self::RequestVote(m) => m.term,
385            Self::RequestVoteReply(m) => m.term,
386            Self::AppendEntries(m) => m.term,
387            Self::AppendEntriesReply(m) => m.term,
388            Self::InstallSnapshot(m) => m.term,
389            Self::InstallSnapshotReply(m) => m.term,
390            Self::TimeoutNow(m) => m.term,
391        }
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_message_term_reads_each_variant() {
401        assert_eq!(
402            Message::RequestVote(RequestVote {
403                term: 1,
404                candidate: 1,
405                last_log_index: 0,
406                last_log_term: 0,
407                force: false,
408            })
409            .term(),
410            1
411        );
412        assert_eq!(
413            Message::RequestVoteReply(RequestVoteReply {
414                term: 2,
415                vote_granted: true,
416                from: 1
417            })
418            .term(),
419            2
420        );
421        assert_eq!(
422            Message::AppendEntries(AppendEntries {
423                term: 3,
424                leader: 1,
425                prev_log_index: 0,
426                prev_log_term: 0,
427                entries: Vec::new(),
428                leader_commit: 0,
429            })
430            .term(),
431            3
432        );
433        assert_eq!(
434            Message::AppendEntriesReply(AppendEntriesReply {
435                term: 4,
436                success: true,
437                from: 1,
438                match_index: 0,
439                conflict_index: 0,
440                conflict_term: 0,
441            })
442            .term(),
443            4
444        );
445    }
446}