Skip to main content

raft_io/
node.rs

1//! The deterministic protocol core: [`RaftNode`], [`Event`], and [`Action`].
2//!
3//! A [`RaftNode`] is a pure state machine. You drive it with [`step`], handing
4//! it one [`Event`] — a logical [`Tick`], an inbound [`Message`], or a client
5//! [`Propose`] — and it returns the [`Action`]s the outside world must carry
6//! out: messages to send and committed commands to apply. It never reads a
7//! clock, opens a socket, or touches a disk; all of that is the caller's job,
8//! reached through the [`RaftLog`] it owns and the
9//! [`RaftTransport`](crate::RaftTransport) the caller drives. That is what makes
10//! the protocol reproducible from a seed and a sequence of events.
11//!
12//! # Scope at v0.5
13//!
14//! The protocol is feature-complete bar membership changes (`v0.6`): leader
15//! election with term and vote safety, the full replication pipeline (batched
16//! `AppendEntries`, per-follower progress with optimistic pipelining,
17//! conflict-hint backtracking, commit on a quorum), durable persistence and
18//! crash recovery (the `WalLog`), and **snapshots with log compaction** — a
19//! policy hint drives the application to snapshot, the log compacts behind it,
20//! and a follower too far behind to replicate is caught up with an
21//! `InstallSnapshot`.
22//!
23//! [`step`]: RaftNode::step
24//! [`Tick`]: Event::Tick
25//! [`Propose`]: Event::Propose
26//! [`Message`]: crate::Message
27//! [`AppendEntries`]: crate::AppendEntries
28//! [`RaftLog`]: crate::RaftLog
29
30use crate::config::RaftConfig;
31use crate::error::{Error, Result};
32use crate::log::{MemoryLog, RaftLog};
33use crate::message::{
34    AppendEntries, AppendEntriesReply, InstallSnapshot, InstallSnapshotReply, Message, PreVote,
35    PreVoteReply, RequestVote, RequestVoteReply, TimeoutNow,
36};
37use crate::rng::Rng;
38use crate::types::{HardState, Index, LogEntry, NodeId, Role, Snapshot, Term};
39
40/// Collects node ids into a sorted, de-duplicated configuration vector, so two
41/// nodes that agree on the membership store it in the same order.
42fn sorted_members(ids: impl IntoIterator<Item = NodeId>) -> Vec<NodeId> {
43    let mut v: Vec<NodeId> = ids.into_iter().collect();
44    v.sort_unstable();
45    v.dedup();
46    v
47}
48
49/// An input handed to [`RaftNode::step`].
50///
51/// A node only ever changes state in response to an event. There are exactly
52/// three, matching Raft's three sources of progress: the passage of (logical)
53/// time, a message from a peer, and a request from a client.
54///
55/// # Examples
56///
57/// ```
58/// use raft_io::{Event, Message, RequestVote};
59///
60/// let _tick = Event::Tick;
61/// let _propose = Event::Propose(b"command".to_vec());
62/// let _msg = Event::Message(Message::RequestVote(RequestVote {
63///     term: 1, candidate: 2, last_log_index: 0, last_log_term: 0, force: false,
64/// }));
65/// ```
66pub enum Event {
67    /// One logical clock tick. The caller decides the wall-clock interval.
68    Tick,
69    /// A message arrived from a peer.
70    Message(Message),
71    /// A client proposes a command to be replicated and applied.
72    ///
73    /// Only a leader may accept a proposal; on any other node
74    /// [`step`](RaftNode::step) returns [`Error::NotLeader`].
75    Propose(Vec<u8>),
76    /// The application supplies a snapshot of its state machine through `index`.
77    ///
78    /// This is the reply to an [`Action::Snapshot`] hint: the application has
79    /// serialized its state up to `index` into `data`. The node compacts the log
80    /// up to `index`. A snapshot for an uncommitted or stale index is ignored.
81    Snapshot {
82        /// The log index the snapshot covers (must be applied and committed).
83        index: Index,
84        /// The serialized state machine state.
85        data: Vec<u8>,
86    },
87    /// Add a voting server to the cluster.
88    ///
89    /// Only the leader may reconfigure; elsewhere [`step`](RaftNode::step)
90    /// returns [`Error::NotLeader`]. One change is processed at a time — a request
91    /// made while a previous configuration change is still uncommitted returns
92    /// [`Error::ConfigInProgress`]. Adding a server already present is a no-op.
93    AddServer(NodeId),
94    /// Remove a voting server from the cluster.
95    ///
96    /// Same rules as [`AddServer`](Event::AddServer). Removing the leader makes
97    /// it step down once the change commits.
98    RemoveServer(NodeId),
99    /// Ask the leader to transfer leadership to `target`.
100    ///
101    /// The leader brings `target` fully up to date, then signals it to start an
102    /// election immediately so it takes over with minimal disruption. A no-op on
103    /// a non-leader or when `target` is not a voter.
104    TransferLeadership(NodeId),
105}
106
107/// An instruction [`RaftNode::step`] returns for the caller to carry out.
108///
109/// The node decides *what* must happen; the caller makes it happen. Execute the
110/// actions in the order returned: any state the protocol depends on has already
111/// been persisted through the [`RaftLog`](crate::RaftLog) before a
112/// [`Send`](Action::Send) is emitted, so honouring the order preserves Raft's
113/// durability rule.
114///
115/// The enum is [`#[non_exhaustive]`](https://doc.rust-lang.org/reference/attributes/type_system.html#the-non_exhaustive-attribute):
116/// future versions may add variants, so a `match` must include a wildcard arm.
117///
118/// # Examples
119///
120/// ```
121/// use raft_io::{Action, Event, RaftConfig, RaftNode};
122///
123/// let mut node = RaftNode::new(RaftConfig::single(1));
124/// while !node.is_leader() {
125///     let _ = node.step(Event::Tick).unwrap();
126/// }
127/// for action in node.step(Event::Propose(b"x".to_vec())).unwrap() {
128///     match action {
129///         Action::Send { to, message } => { let _ = (to, message); }
130///         Action::Apply { index, term, command } => { let _ = (index, term, command); }
131///         _ => {}
132///     }
133/// }
134/// ```
135#[non_exhaustive]
136#[derive(Clone, Debug, PartialEq, Eq)]
137pub enum Action {
138    /// Send `message` to node `to` via the transport.
139    Send {
140        /// Destination node.
141        to: NodeId,
142        /// The message to deliver.
143        message: Message,
144    },
145    /// Apply a committed command to the application state machine.
146    ///
147    /// Applies are emitted in strictly increasing index order and each index is
148    /// emitted at most once, so the caller can apply them blindly in sequence.
149    Apply {
150        /// Index of the committed entry.
151        index: Index,
152        /// Term the entry was created in.
153        term: Term,
154        /// The opaque command bytes to apply.
155        command: Vec<u8>,
156    },
157    /// Take a snapshot of the state machine through `index` and return it.
158    ///
159    /// A hint emitted when the log has grown past the configured snapshot
160    /// threshold. The application serializes its state up to `index` and feeds it
161    /// back with [`Event::Snapshot`], after which the node compacts the log.
162    /// Acting on the hint is optional but unbounded growth follows from ignoring
163    /// it.
164    Snapshot {
165        /// The applied index the snapshot should cover.
166        index: Index,
167        /// Term of the entry at `index`.
168        term: Term,
169    },
170    /// Reset the state machine to an installed snapshot.
171    ///
172    /// Emitted on a follower that received a leader's snapshot because it had
173    /// fallen too far behind to replicate entry by entry. The application
174    /// replaces its state with `data` (which represents the state through
175    /// `index`); subsequent [`Apply`](Action::Apply) actions resume from
176    /// `index + 1`.
177    RestoreSnapshot {
178        /// The index the snapshot covers.
179        index: Index,
180        /// Term of the entry at `index`.
181        term: Term,
182        /// The serialized state to restore.
183        data: Vec<u8>,
184    },
185    /// The cluster's voting membership changed.
186    ///
187    /// Emitted whenever the node adopts a new configuration (as a leader
188    /// appending the change, or a follower receiving it). The application should
189    /// update its transport so it can reach the new members and stop reaching
190    /// removed ones. Membership takes effect immediately on this action, before
191    /// the change commits.
192    MembershipChanged {
193        /// The new voting membership.
194        members: Vec<NodeId>,
195    },
196}
197
198/// How a leader is replicating to one follower.
199///
200/// A leader does not know where a new follower's log diverges from its own, so
201/// it starts in `Probe`: it sends conservatively and waits for each reply,
202/// backtracking on rejection until an append is accepted. Once the match point
203/// is found it switches to `Replicate` and streams entries, advancing
204/// optimistically without waiting — the pipelining that gives steady-state
205/// throughput.
206#[derive(Clone, Copy, Debug, PartialEq, Eq)]
207enum ProgressState {
208    Probe,
209    Replicate,
210}
211
212/// The leader's view of one follower's replication progress.
213#[derive(Clone, Copy, Debug)]
214struct Progress {
215    /// The follower this progress tracks.
216    id: NodeId,
217    /// Index of the next entry to send this follower.
218    next_index: Index,
219    /// Highest index known to be replicated on this follower.
220    match_index: Index,
221    /// Whether we are still probing for the match point or streaming.
222    state: ProgressState,
223}
224
225/// A node in a Raft cluster: the deterministic consensus state machine.
226///
227/// Create one with [`new`](RaftNode::new) (Tier 1, in-memory log) or
228/// [`with_log`](RaftNode::with_log) (Tier 3, your own [`RaftLog`]), then drive
229/// it with [`step`](RaftNode::step). The generic `L` defaults to [`MemoryLog`],
230/// so the common case never has to name it.
231///
232/// # Examples
233///
234/// ```
235/// use raft_io::{Event, RaftConfig, RaftNode};
236///
237/// let mut node = RaftNode::new(RaftConfig::single(1));
238/// assert!(!node.is_leader());
239/// while !node.is_leader() {
240///     let _ = node.step(Event::Tick).unwrap();
241/// }
242/// assert!(node.is_leader());
243/// ```
244pub struct RaftNode<L: RaftLog = MemoryLog> {
245    id: NodeId,
246    /// Current voting membership (includes this node when it is a voter). The
247    /// quorum and election logic read from this; it changes as configuration
248    /// entries are appended.
249    voters: Vec<NodeId>,
250    /// The configuration in effect at the snapshot base (or the bootstrap
251    /// configuration when there is no snapshot). Entries below the base are gone,
252    /// so this anchors configuration recovery.
253    base_config: Vec<NodeId>,
254    /// Index of the configuration entry currently in effect, or `0` when the
255    /// configuration comes from `base_config` rather than a live log entry.
256    config_index: Index,
257    election_timeout_min: u32,
258    election_timeout_max: u32,
259    heartbeat_interval: u32,
260    max_batch: usize,
261    snapshot_threshold: usize,
262
263    log: L,
264    role: Role,
265    current_term: Term,
266    voted_for: Option<NodeId>,
267    leader_id: Option<NodeId>,
268    commit_index: Index,
269    last_applied: Index,
270
271    election_elapsed: u32,
272    heartbeat_elapsed: u32,
273    election_timeout: u32,
274    votes: Vec<NodeId>,
275    /// Whether a pre-vote round is outstanding. While set, the node has not yet
276    /// incremented its term — it is only probing whether a real election could
277    /// be won (Raft §9.6). It remains a [`Follower`](Role::Follower) throughout.
278    pre_voting: bool,
279    /// Distinct peers that have granted the current pre-vote round.
280    pre_votes: Vec<NodeId>,
281    /// Per-peer replication progress, aligned with `peers`. Non-empty only while
282    /// this node is the leader.
283    progress: Vec<Progress>,
284    /// Highest index a snapshot hint has already been emitted for, so the policy
285    /// fires at most once per threshold crossing.
286    snapshot_hinted_at: Index,
287    /// The target of an in-progress leadership transfer, if any. While set, the
288    /// leader declines new proposals so the transfer can complete.
289    transfer_target: Option<NodeId>,
290    rng: Rng,
291}
292
293impl RaftNode<MemoryLog> {
294    /// Creates a node from `config` backed by an in-memory [`MemoryLog`].
295    ///
296    /// This is the Tier-1 entry point: one call, no generic to name, no I/O to
297    /// wire up. The node starts as a [`Follower`](Role::Follower) in term `0`.
298    ///
299    /// # Examples
300    ///
301    /// ```
302    /// use raft_io::{RaftConfig, RaftNode, Role};
303    ///
304    /// let node = RaftNode::new(RaftConfig::new(1, [2, 3]));
305    /// assert_eq!(node.role(), Role::Follower);
306    /// assert_eq!(node.term(), 0);
307    /// ```
308    #[must_use]
309    pub fn new(config: RaftConfig) -> Self {
310        Self::with_log(config, MemoryLog::new())
311    }
312}
313
314impl<L: RaftLog> RaftNode<L> {
315    /// Creates a node from `config` backed by a caller-supplied `log`.
316    ///
317    /// This is the Tier-3 entry point: provide any [`RaftLog`] implementation —
318    /// for example a durable, `wal-db`-backed store (`v0.4`). The node adopts
319    /// the log's persisted [`HardState`](crate::HardState) on construction, so a
320    /// store recovered from disk resumes in the term it last persisted and with
321    /// the vote it last cast.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// use raft_io::{MemoryLog, RaftConfig, RaftNode};
327    ///
328    /// let node = RaftNode::with_log(RaftConfig::single(1), MemoryLog::new());
329    /// assert_eq!(node.id(), 1);
330    /// ```
331    #[must_use]
332    pub fn with_log(config: RaftConfig, log: L) -> Self {
333        let hard = log.hard_state();
334        // A recovered snapshot covers committed, already-applied state: start
335        // commit and applied at its boundary so those entries are not re-emitted.
336        // The application restores its state machine from `log.snapshot()`.
337        let base = log.snapshot_index();
338
339        // Bootstrap configuration: the snapshot's recorded membership if it has
340        // one, otherwise this node plus its configured peers.
341        let bootstrap = sorted_members(config.peers.iter().copied().chain([config.id]));
342        let base_config = match log.snapshot() {
343            Some(s) if !s.config.is_empty() => s.config,
344            _ => bootstrap,
345        };
346        // The effective configuration is the latest config entry in the live log,
347        // or `base_config` if there is none.
348        let mut voters = base_config.clone();
349        let mut config_index = 0;
350        let mut i = log.last_index();
351        while i > base {
352            if let Some(members) = log.entry(i).and_then(|e| e.members()) {
353                voters = members;
354                config_index = i;
355                break;
356            }
357            i -= 1;
358        }
359
360        let mut rng = Rng::new(config.seed);
361        let election_timeout =
362            rng.gen_range(config.election_timeout_min, config.election_timeout_max);
363        Self {
364            id: config.id,
365            voters,
366            base_config,
367            config_index,
368            election_timeout_min: config.election_timeout_min,
369            election_timeout_max: config.election_timeout_max,
370            heartbeat_interval: config.heartbeat_interval,
371            max_batch: config.max_batch,
372            snapshot_threshold: config.snapshot_threshold,
373            log,
374            role: Role::Follower,
375            current_term: hard.term,
376            voted_for: hard.voted_for,
377            leader_id: None,
378            commit_index: base,
379            last_applied: base,
380            election_elapsed: 0,
381            heartbeat_elapsed: 0,
382            election_timeout,
383            votes: Vec::new(),
384            pre_voting: false,
385            pre_votes: Vec::new(),
386            progress: Vec::new(),
387            snapshot_hinted_at: base,
388            transfer_target: None,
389            rng,
390        }
391    }
392
393    // ---- accessors -------------------------------------------------------
394
395    /// Returns this node's id.
396    #[inline]
397    #[must_use]
398    pub fn id(&self) -> NodeId {
399        self.id
400    }
401
402    /// Returns the role the node currently plays.
403    #[inline]
404    #[must_use]
405    pub fn role(&self) -> Role {
406        self.role
407    }
408
409    /// Returns `true` if the node is the leader.
410    ///
411    /// # Examples
412    ///
413    /// ```
414    /// use raft_io::{Event, RaftConfig, RaftNode};
415    ///
416    /// let mut node = RaftNode::new(RaftConfig::single(1));
417    /// while !node.is_leader() {
418    ///     let _ = node.step(Event::Tick).unwrap();
419    /// }
420    /// assert!(node.is_leader());
421    /// ```
422    #[inline]
423    #[must_use]
424    pub fn is_leader(&self) -> bool {
425        self.role == Role::Leader
426    }
427
428    /// Returns the node's current term.
429    #[inline]
430    #[must_use]
431    pub fn term(&self) -> Term {
432        self.current_term
433    }
434
435    /// Returns the leader the node currently recognises, if any.
436    #[inline]
437    #[must_use]
438    pub fn leader(&self) -> Option<NodeId> {
439        self.leader_id
440    }
441
442    /// Returns the highest log index known to be committed.
443    #[inline]
444    #[must_use]
445    pub fn commit_index(&self) -> Index {
446        self.commit_index
447    }
448
449    /// Returns the highest log index the node has applied.
450    #[inline]
451    #[must_use]
452    pub fn last_applied(&self) -> Index {
453        self.last_applied
454    }
455
456    /// Returns a shared reference to the underlying log.
457    ///
458    /// # Examples
459    ///
460    /// ```
461    /// use raft_io::{RaftConfig, RaftNode, RaftLog};
462    ///
463    /// let node = RaftNode::new(RaftConfig::single(1));
464    /// assert_eq!(node.log().last_index(), 0);
465    /// ```
466    #[inline]
467    #[must_use]
468    pub fn log(&self) -> &L {
469        &self.log
470    }
471
472    /// Returns the current voting membership of the cluster.
473    ///
474    /// This reflects the latest configuration the node has in its log, which it
475    /// adopts as soon as the configuration entry is appended (before it commits).
476    ///
477    /// # Examples
478    ///
479    /// ```
480    /// use raft_io::{RaftConfig, RaftNode};
481    ///
482    /// let node = RaftNode::new(RaftConfig::new(1, [2, 3]));
483    /// assert_eq!(node.members(), &[1, 2, 3]);
484    /// ```
485    #[inline]
486    #[must_use]
487    pub fn members(&self) -> &[NodeId] {
488        &self.voters
489    }
490
491    // ---- configuration ---------------------------------------------------
492
493    /// The number of votes (or replicas) that form a majority of the current
494    /// voting membership.
495    #[inline]
496    fn quorum(&self) -> usize {
497        self.voters.len() / 2 + 1
498    }
499
500    /// Whether this node is a voting member of the current configuration.
501    #[inline]
502    fn is_voter(&self) -> bool {
503        self.voters.contains(&self.id)
504    }
505
506    /// Adopts `voters` as the new configuration (established by the entry at
507    /// `config_index`, or `0` for the base configuration). Rebuilds leader
508    /// progress for the new peer set and emits [`Action::MembershipChanged`] if
509    /// the membership actually changed.
510    fn set_config(&mut self, voters: Vec<NodeId>, config_index: Index, actions: &mut Vec<Action>) {
511        let changed = voters != self.voters;
512        self.voters = voters;
513        self.config_index = config_index;
514        if self.role == Role::Leader {
515            self.rebuild_progress();
516        }
517        if changed {
518            actions.push(Action::MembershipChanged {
519                members: self.voters.clone(),
520            });
521        }
522    }
523
524    /// Scans the live log for the latest configuration entry and adopts it (or
525    /// the base configuration if there is none). Used after a truncation or a
526    /// snapshot install, where the in-effect configuration may have moved.
527    fn refresh_config(&mut self, actions: &mut Vec<Action>) {
528        let base = self.log.snapshot_index();
529        let mut voters = self.base_config.clone();
530        let mut config_index = 0;
531        let mut i = self.log.last_index();
532        while i > base {
533            if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
534                voters = members;
535                config_index = i;
536                break;
537            }
538            i -= 1;
539        }
540        self.set_config(voters, config_index, actions);
541    }
542
543    /// Rebuilds leader replication progress for the current peer set, preserving
544    /// the match/next/state of peers that remain.
545    fn rebuild_progress(&mut self) {
546        let next = self.log.last_index() + 1;
547        let old = core::mem::take(&mut self.progress);
548        self.progress = self
549            .voters
550            .iter()
551            .filter(|&&id| id != self.id)
552            .map(|&id| {
553                old.iter()
554                    .find(|p| p.id == id)
555                    .copied()
556                    .unwrap_or(Progress {
557                        id,
558                        next_index: next,
559                        match_index: 0,
560                        state: ProgressState::Probe,
561                    })
562            })
563            .collect();
564    }
565
566    /// Returns the index of `id` in the leader progress table, if present.
567    fn progress_index(&self, id: NodeId) -> Option<usize> {
568        self.progress.iter().position(|p| p.id == id)
569    }
570
571    // ---- the step function ----------------------------------------------
572
573    /// Advances the state machine by one [`Event`] and returns the resulting
574    /// [`Action`]s.
575    ///
576    /// This is the only way to drive a node. Feed it ticks to let time pass,
577    /// inbound messages as they arrive, and client proposals; act on every
578    /// returned action, in order. The call is deterministic: the same node state
579    /// and the same event always produce the same actions.
580    ///
581    /// # Errors
582    ///
583    /// - [`Error::NotLeader`] if the event is [`Event::Propose`] and this node
584    ///   is not the leader; the error carries the known leader so the caller can
585    ///   redirect.
586    /// - [`Error::Storage`] if the underlying [`RaftLog`] fails on the
587    ///   durability path. A storage failure here is fatal to the node.
588    ///
589    /// # Examples
590    ///
591    /// ```
592    /// use raft_io::{Action, Event, RaftConfig, RaftNode};
593    ///
594    /// let mut node = RaftNode::new(RaftConfig::single(1));
595    /// while !node.is_leader() {
596    ///     let _ = node.step(Event::Tick).unwrap();
597    /// }
598    /// let actions = node.step(Event::Propose(b"set x 1".to_vec())).unwrap();
599    /// assert!(actions.iter().any(|a| matches!(a, Action::Apply { .. })));
600    /// ```
601    pub fn step(&mut self, event: Event) -> Result<Vec<Action>> {
602        match event {
603            Event::Tick => self.tick(),
604            Event::Message(message) => self.handle_message(message),
605            Event::Propose(command) => self.propose(command),
606            Event::Snapshot { index, data } => self.handle_snapshot_event(index, data),
607            Event::AddServer(id) => self.change_membership(Some(id), None),
608            Event::RemoveServer(id) => self.change_membership(None, Some(id)),
609            Event::TransferLeadership(target) => self.transfer_leadership(target),
610        }
611    }
612
613    // ---- tick handling ---------------------------------------------------
614
615    fn tick(&mut self) -> Result<Vec<Action>> {
616        let mut actions = Vec::new();
617        match self.role {
618            Role::Follower | Role::Candidate => {
619                self.election_elapsed += 1;
620                // Only a voting member campaigns; a node not in the configuration
621                // (for example, removed, or not yet caught up) follows quietly. A
622                // timeout opens a pre-vote round rather than a real election, so a
623                // partitioned node cannot inflate its term (Raft §9.6).
624                if self.election_elapsed >= self.election_timeout && self.is_voter() {
625                    self.start_pre_vote(&mut actions)?;
626                }
627            }
628            Role::Leader => {
629                self.heartbeat_elapsed += 1;
630                if self.heartbeat_elapsed >= self.heartbeat_interval {
631                    self.heartbeat_elapsed = 0;
632                    self.replicate_to_all(&mut actions);
633                }
634            }
635        }
636        Ok(actions)
637    }
638
639    /// Opens a pre-vote round (Raft §9.6): asks peers whether they *would* vote
640    /// for this node at the next term, without incrementing the term or casting a
641    /// vote. A real election begins only once a quorum of pre-votes is collected.
642    ///
643    /// This is the disruption guard. A node partitioned from the cluster never
644    /// gathers a pre-vote majority, so its term never climbs; when it rejoins it
645    /// does not force the sitting leader to step down. The node stays a
646    /// [`Follower`](Role::Follower) for the duration of the round — it has not
647    /// truly campaigned.
648    fn start_pre_vote(&mut self, actions: &mut Vec<Action>) -> Result<()> {
649        self.role = Role::Follower;
650        self.leader_id = None;
651        self.pre_voting = true;
652        self.pre_votes.clear();
653        self.pre_votes.push(self.id);
654        self.reset_election_timer();
655
656        // A single-node cluster (or any where one grant is a majority) needs no
657        // probe: campaign for real at once.
658        if self.pre_votes.len() >= self.quorum() {
659            return self.start_election(false, actions);
660        }
661
662        let last_log_index = self.log.last_index();
663        let last_log_term = self.log.last_term();
664        let term = self.current_term + 1; // the hypothetical term, not adopted
665        let id = self.id;
666        for &peer in &self.voters {
667            if peer == id {
668                continue;
669            }
670            actions.push(Action::Send {
671                to: peer,
672                message: Message::PreVote(PreVote {
673                    term,
674                    candidate: id,
675                    last_log_index,
676                    last_log_term,
677                }),
678            });
679        }
680        Ok(())
681    }
682
683    fn start_election(&mut self, force: bool, actions: &mut Vec<Action>) -> Result<()> {
684        self.role = Role::Candidate;
685        self.current_term += 1;
686        self.voted_for = Some(self.id);
687        self.leader_id = None;
688        self.transfer_target = None;
689        self.pre_voting = false;
690        self.pre_votes.clear();
691        self.progress.clear();
692        self.votes.clear();
693        self.votes.push(self.id);
694        self.reset_election_timer();
695        self.persist_hard_state()?;
696
697        // A single-node cluster (or any cluster where one vote is a majority)
698        // wins immediately.
699        if self.votes.len() >= self.quorum() {
700            self.become_leader(actions);
701            return Ok(());
702        }
703
704        let last_log_index = self.log.last_index();
705        let last_log_term = self.log.last_term();
706        let term = self.current_term;
707        let id = self.id;
708        for &peer in &self.voters {
709            if peer == id {
710                continue;
711            }
712            actions.push(Action::Send {
713                to: peer,
714                message: Message::RequestVote(RequestVote {
715                    term,
716                    candidate: id,
717                    last_log_index,
718                    last_log_term,
719                    force,
720                }),
721            });
722        }
723        Ok(())
724    }
725
726    fn become_leader(&mut self, actions: &mut Vec<Action>) {
727        self.role = Role::Leader;
728        self.leader_id = Some(self.id);
729        self.heartbeat_elapsed = 0;
730        self.transfer_target = None;
731        self.pre_voting = false;
732        self.pre_votes.clear();
733        // Initialise per-peer progress for the current configuration: each
734        // follower is assumed caught up (next = last + 1) and probed to find
735        // where it actually is.
736        self.rebuild_progress();
737        // Assert authority at once with an initial round of appends, and
738        // (single-node) commit anything outstanding from the current term.
739        self.replicate_to_all(actions);
740        self.advance_commit(actions);
741    }
742
743    /// Sends an `AppendEntries` to every peer. On a heartbeat tick this both
744    /// asserts leadership (empty append to caught-up followers) and probes or
745    /// streams to those behind.
746    fn replicate_to_all(&mut self, actions: &mut Vec<Action>) {
747        for i in 0..self.progress.len() {
748            self.send_append(i, actions);
749        }
750    }
751
752    /// Streams freshly appended entries to peers already in `Replicate` state.
753    /// Probing peers are driven by replies and heartbeats instead, so a busy
754    /// proposer does not flood a lagging follower with redundant probes.
755    fn replicate_to_streaming(&mut self, actions: &mut Vec<Action>) {
756        for i in 0..self.progress.len() {
757            if self.progress[i].state == ProgressState::Replicate {
758                self.send_append(i, actions);
759            }
760        }
761    }
762
763    /// Builds and emits one `AppendEntries` for the peer at progress index `i`,
764    /// carrying up to `max_batch` entries from that peer's `next_index`. In
765    /// `Replicate` state a non-empty send advances `next_index` optimistically so
766    /// the next batch can follow without waiting for the reply (pipelining).
767    fn send_append(&mut self, i: usize, actions: &mut Vec<Action>) {
768        let next = self.progress[i].next_index;
769        // If the entry preceding `next` has been compacted away, the follower is
770        // too far behind to replicate from the log — send the snapshot instead.
771        if next <= self.log.snapshot_index() {
772            self.send_snapshot(i, actions);
773            return;
774        }
775
776        let peer = self.progress[i].id;
777        let state = self.progress[i].state;
778        let prev_log_index = next - 1;
779        let prev_log_term = self.log.term_at(prev_log_index).unwrap_or(0);
780
781        let last = self.log.last_index();
782        let entries = if last >= next {
783            let to = last.min(next + self.max_batch as Index - 1);
784            self.log.entries(next, to)
785        } else {
786            Vec::new()
787        };
788        let count = entries.len() as Index;
789
790        actions.push(Action::Send {
791            to: peer,
792            message: Message::AppendEntries(AppendEntries {
793                term: self.current_term,
794                leader: self.id,
795                prev_log_index,
796                prev_log_term,
797                entries,
798                leader_commit: self.commit_index,
799            }),
800        });
801
802        if count > 0 && state == ProgressState::Replicate {
803            self.progress[i].next_index = next + count;
804        }
805    }
806
807    /// Sends the current snapshot to peer index `i`. Used when the follower needs
808    /// an entry the leader has already compacted away. Progress stays in `Probe`
809    /// until the reply confirms the install, so it is not advanced here.
810    fn send_snapshot(&mut self, i: usize, actions: &mut Vec<Action>) {
811        if let Some(snapshot) = self.log.snapshot() {
812            self.progress[i].state = ProgressState::Probe;
813            actions.push(Action::Send {
814                to: self.progress[i].id,
815                message: Message::InstallSnapshot(InstallSnapshot {
816                    term: self.current_term,
817                    leader: self.id,
818                    snapshot,
819                }),
820            });
821        }
822    }
823
824    // ---- proposals -------------------------------------------------------
825
826    fn propose(&mut self, command: Vec<u8>) -> Result<Vec<Action>> {
827        if self.role != Role::Leader || self.transfer_target.is_some() {
828            return Err(Error::NotLeader {
829                leader: self.leader_id,
830            });
831        }
832        let index = self.log.last_index() + 1;
833        let entry = LogEntry::new(self.current_term, index, command);
834        self.log.append(core::slice::from_ref(&entry))?;
835        self.log.sync()?;
836
837        let mut actions = Vec::new();
838        // Stream the new entry to followers that are caught up; commit at once if
839        // a quorum already holds it (the single-node case).
840        self.replicate_to_streaming(&mut actions);
841        self.advance_commit(&mut actions);
842        Ok(actions)
843    }
844
845    /// Advances the commit index to the highest entry a quorum has stored.
846    ///
847    /// Counts, for each candidate index `n`, the leader plus every follower
848    /// whose `match_index` reaches `n`. Raft's safety rule (§5.4.2) is enforced
849    /// strictly: an entry is committed by counting replicas **only if it was
850    /// created in the current term**. Older-term entries ride along once a
851    /// current-term entry above them commits. A single-node cluster commits its
852    /// own current-term tail immediately (quorum of one).
853    fn advance_commit(&mut self, actions: &mut Vec<Action>) {
854        let last = self.log.last_index();
855        let quorum = self.quorum();
856        // The leader counts toward a quorum only while it is itself a voter; a
857        // leader being removed (no longer a voter) needs a majority of the
858        // remaining members before its own removal commits.
859        let leader_holds = usize::from(self.is_voter());
860        let mut new_commit = self.commit_index;
861        let mut n = last;
862        while n > self.commit_index {
863            match self.log.term_at(n) {
864                Some(term) if term == self.current_term => {
865                    let mut replicas = leader_holds;
866                    for p in &self.progress {
867                        if p.match_index >= n {
868                            replicas += 1;
869                        }
870                    }
871                    if replicas >= quorum {
872                        new_commit = n;
873                        break; // highest such index found
874                    }
875                }
876                // Terms never decrease down the log; once we pass below the
877                // current term there is no current-term entry left to commit.
878                Some(term) if term < self.current_term => break,
879                _ => {}
880            }
881            n -= 1;
882        }
883        if new_commit > self.commit_index {
884            self.commit_index = new_commit;
885            self.drain_applies(actions);
886            // A leader that has just committed its own removal steps down.
887            if self.role == Role::Leader
888                && !self.is_voter()
889                && self.config_index != 0
890                && self.commit_index >= self.config_index
891            {
892                self.step_down_to_follower();
893            }
894        }
895    }
896
897    /// Drops leadership without changing term, after committing a configuration
898    /// that no longer includes this node.
899    fn step_down_to_follower(&mut self) {
900        self.role = Role::Follower;
901        self.leader_id = None;
902        self.transfer_target = None;
903        self.pre_voting = false;
904        self.pre_votes.clear();
905        self.progress.clear();
906        self.votes.clear();
907    }
908
909    fn drain_applies(&mut self, actions: &mut Vec<Action>) {
910        while self.last_applied < self.commit_index {
911            self.last_applied += 1;
912            // Configuration entries are protocol bookkeeping, not application
913            // commands — they take effect on append and are never applied to the
914            // state machine. The applied index still advances over them.
915            if let Some(entry) = self.log.entry(self.last_applied) {
916                if entry.members().is_none() {
917                    actions.push(Action::Apply {
918                        index: entry.index,
919                        term: entry.term,
920                        command: entry.command,
921                    });
922                }
923            }
924        }
925        self.maybe_hint_snapshot(actions);
926    }
927
928    /// Emits a snapshot hint once the applied log has grown past the configured
929    /// threshold beyond the last snapshot. Fires at most once per crossing.
930    fn maybe_hint_snapshot(&mut self, actions: &mut Vec<Action>) {
931        if self.snapshot_threshold == 0 {
932            return;
933        }
934        let base = self.log.snapshot_index();
935        let grown = self.last_applied.saturating_sub(base) as usize;
936        if grown >= self.snapshot_threshold && self.last_applied > self.snapshot_hinted_at {
937            if let Some(term) = self.log.term_at(self.last_applied) {
938                self.snapshot_hinted_at = self.last_applied;
939                actions.push(Action::Snapshot {
940                    index: self.last_applied,
941                    term,
942                });
943            }
944        }
945    }
946
947    // ---- snapshots -------------------------------------------------------
948
949    /// Handles the application's snapshot of its state machine through `index`.
950    ///
951    /// Compacts the log up to `index` if the snapshot is valid: it must cover a
952    /// committed, already-applied index that is newer than any existing snapshot,
953    /// and the entry at `index` must still be present so its term is known. An
954    /// out-of-range or stale snapshot is ignored rather than treated as an error.
955    fn handle_snapshot_event(&mut self, index: Index, data: Vec<u8>) -> Result<Vec<Action>> {
956        if index > self.commit_index
957            || index > self.last_applied
958            || index <= self.log.snapshot_index()
959        {
960            return Ok(Vec::new());
961        }
962        let Some(term) = self.log.term_at(index) else {
963            return Ok(Vec::new());
964        };
965        // Record the configuration in effect at `index` so a node catching up
966        // from this snapshot — its config entries compacted — still knows the
967        // membership.
968        let config = self.config_at(index);
969        self.base_config = config.clone();
970        self.log
971            .apply_snapshot(&Snapshot::with_config(index, term, config, data))?;
972        self.log.sync()?;
973        if self.snapshot_hinted_at < index {
974            self.snapshot_hinted_at = index;
975        }
976        let mut actions = Vec::new();
977        self.refresh_config(&mut actions);
978        Ok(actions)
979    }
980
981    /// Returns the voting membership in effect at `index`: the latest live
982    /// configuration entry at or below `index`, or the base configuration.
983    fn config_at(&self, index: Index) -> Vec<NodeId> {
984        let base = self.log.snapshot_index();
985        let mut i = index.min(self.log.last_index());
986        while i > base {
987            if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
988                return members;
989            }
990            i -= 1;
991        }
992        self.base_config.clone()
993    }
994
995    // ---- message handling ------------------------------------------------
996
997    fn handle_message(&mut self, message: Message) -> Result<Vec<Action>> {
998        // Leader stickiness (Raft §4.2.3): ignore a `RequestVote` — not even
999        // adopting its term — while a leader we recognise is still active (we
1000        // heard from it within the minimum election timeout). This stops a
1001        // removed or partitioned server, which never hears heartbeats and so
1002        // keeps timing out, from disrupting the cluster with ever-higher terms. A
1003        // candidate (no recognised leader) is unaffected.
1004        if matches!(message, Message::RequestVote(ref rv) if !rv.force)
1005            && self.leader_id.is_some()
1006            && self.election_elapsed < self.election_timeout_min
1007        {
1008            return Ok(Vec::new());
1009        }
1010
1011        let mut actions = Vec::new();
1012
1013        // Pre-vote messages are hypothetical: they carry a term the sender has not
1014        // adopted, and neither side changes any persistent state for them. Handle
1015        // and return before the generic higher-term step-down below — a pre-vote
1016        // must never inflate our term, which is the whole point of the mechanism.
1017        let message = match message {
1018            Message::PreVote(pv) => {
1019                self.handle_pre_vote(pv, &mut actions);
1020                return Ok(actions);
1021            }
1022            Message::PreVoteReply(reply) => {
1023                self.handle_pre_vote_reply(reply, &mut actions)?;
1024                return Ok(actions);
1025            }
1026            other => other,
1027        };
1028
1029        // Any other message from a later term forces a step-down and term
1030        // adoption, before the message is interpreted in its own right.
1031        if message.term() > self.current_term {
1032            self.become_follower(message.term(), None)?;
1033        }
1034
1035        match message {
1036            Message::RequestVote(rv) => self.handle_request_vote(rv, &mut actions)?,
1037            Message::RequestVoteReply(reply) => self.handle_vote_reply(reply, &mut actions),
1038            Message::AppendEntries(ae) => self.handle_append_entries(ae, &mut actions)?,
1039            Message::AppendEntriesReply(reply) => self.handle_append_reply(reply, &mut actions),
1040            Message::InstallSnapshot(rpc) => self.handle_install_snapshot(rpc, &mut actions)?,
1041            Message::InstallSnapshotReply(reply) => {
1042                self.handle_install_snapshot_reply(reply, &mut actions);
1043            }
1044            Message::TimeoutNow(rpc) => self.handle_timeout_now(rpc, &mut actions)?,
1045            // Routed above, before the term step-down.
1046            Message::PreVote(_) | Message::PreVoteReply(_) => {}
1047        }
1048        Ok(actions)
1049    }
1050
1051    fn become_follower(&mut self, term: Term, leader: Option<NodeId>) -> Result<()> {
1052        let hard_state_changed = term > self.current_term;
1053        self.role = Role::Follower;
1054        if term > self.current_term {
1055            self.current_term = term;
1056            self.voted_for = None;
1057        }
1058        self.leader_id = leader;
1059        self.transfer_target = None;
1060        self.pre_voting = false;
1061        self.pre_votes.clear();
1062        self.votes.clear();
1063        self.progress.clear();
1064        if hard_state_changed {
1065            self.persist_hard_state()?;
1066        }
1067        Ok(())
1068    }
1069
1070    fn handle_request_vote(&mut self, rv: RequestVote, actions: &mut Vec<Action>) -> Result<()> {
1071        let mut granted = false;
1072        if rv.term >= self.current_term {
1073            let can_vote = match self.voted_for {
1074                None => true,
1075                Some(c) => c == rv.candidate,
1076            };
1077            let log_ok = self.candidate_log_up_to_date(rv.last_log_term, rv.last_log_index);
1078            if can_vote && log_ok {
1079                granted = true;
1080                self.voted_for = Some(rv.candidate);
1081                self.persist_hard_state()?;
1082                self.reset_election_timer();
1083            }
1084        }
1085        actions.push(Action::Send {
1086            to: rv.candidate,
1087            message: Message::RequestVoteReply(RequestVoteReply {
1088                term: self.current_term,
1089                vote_granted: granted,
1090                from: self.id,
1091            }),
1092        });
1093        Ok(())
1094    }
1095
1096    /// The election restriction: a candidate's log must be at least as
1097    /// up to date as ours — a later last term wins, or an equal last term with
1098    /// at least as high an index.
1099    fn candidate_log_up_to_date(&self, cand_last_term: Term, cand_last_index: Index) -> bool {
1100        let my_term = self.log.last_term();
1101        let my_index = self.log.last_index();
1102        cand_last_term > my_term || (cand_last_term == my_term && cand_last_index >= my_index)
1103    }
1104
1105    fn handle_vote_reply(&mut self, reply: RequestVoteReply, actions: &mut Vec<Action>) {
1106        if self.role != Role::Candidate || reply.term != self.current_term {
1107            return;
1108        }
1109        if reply.vote_granted && !self.votes.contains(&reply.from) {
1110            self.votes.push(reply.from);
1111            if self.votes.len() >= self.quorum() {
1112                self.become_leader(actions);
1113            }
1114        }
1115    }
1116
1117    /// Answers a peer's pre-vote probe. Grants only if we recognise no active
1118    /// leader (the same stickiness that guards a real vote), the probe's
1119    /// hypothetical term is not behind ours, and the candidate's log is at least
1120    /// as up to date as ours. A pre-vote changes no persistent state: we neither
1121    /// adopt its term nor record a vote, so a peer may grant several pre-votes in
1122    /// the same term — only the real [`RequestVote`] consumes the single vote.
1123    fn handle_pre_vote(&mut self, pv: PreVote, actions: &mut Vec<Action>) {
1124        let have_active_leader =
1125            self.leader_id.is_some() && self.election_elapsed < self.election_timeout_min;
1126        let granted = pv.term >= self.current_term
1127            && !have_active_leader
1128            && self.candidate_log_up_to_date(pv.last_log_term, pv.last_log_index);
1129        actions.push(Action::Send {
1130            to: pv.candidate,
1131            message: Message::PreVoteReply(PreVoteReply {
1132                term: self.current_term,
1133                vote_granted: granted,
1134                from: self.id,
1135            }),
1136        });
1137    }
1138
1139    /// Counts a pre-vote reply. A reply carrying a higher term means real activity
1140    /// we are behind on, so we abandon the round and adopt that term. Otherwise a
1141    /// grant adds to the tally, and once a quorum is reached we begin the real
1142    /// election — only here does the term finally advance.
1143    fn handle_pre_vote_reply(
1144        &mut self,
1145        reply: PreVoteReply,
1146        actions: &mut Vec<Action>,
1147    ) -> Result<()> {
1148        if !self.pre_voting {
1149            return Ok(());
1150        }
1151        if reply.term > self.current_term {
1152            self.pre_voting = false;
1153            self.pre_votes.clear();
1154            return self.become_follower(reply.term, None);
1155        }
1156        if reply.vote_granted && !self.pre_votes.contains(&reply.from) {
1157            self.pre_votes.push(reply.from);
1158            if self.pre_votes.len() >= self.quorum() {
1159                self.start_election(false, actions)?;
1160            }
1161        }
1162        Ok(())
1163    }
1164
1165    fn handle_append_entries(
1166        &mut self,
1167        ae: AppendEntries,
1168        actions: &mut Vec<Action>,
1169    ) -> Result<()> {
1170        let mut reply = AppendEntriesReply {
1171            term: self.current_term,
1172            success: false,
1173            from: self.id,
1174            match_index: 0,
1175            conflict_index: 0,
1176            conflict_term: 0,
1177        };
1178
1179        // Reject a stale leader outright, telling it our (higher) term.
1180        if ae.term < self.current_term {
1181            actions.push(Action::Send {
1182                to: ae.leader,
1183                message: Message::AppendEntriesReply(reply),
1184            });
1185            return Ok(());
1186        }
1187
1188        // A valid leader for our term: accept its authority and reset the timer.
1189        // Recognising a leader ends any pre-vote round in progress.
1190        self.role = Role::Follower;
1191        self.leader_id = Some(ae.leader);
1192        self.pre_voting = false;
1193        self.reset_election_timer();
1194
1195        // The entries up to `prev_log_index` are already subsumed by our
1196        // snapshot. This happens for a stale or reordered RPC after we compacted;
1197        // we cannot verify a compacted `prev_log_term`, so we simply report that
1198        // we already hold everything through the snapshot boundary and let the
1199        // leader resend the tail with a `prev` we can check.
1200        let base = self.log.snapshot_index();
1201        if ae.prev_log_index < base {
1202            if ae.leader_commit > self.commit_index {
1203                self.commit_index = ae.leader_commit.min(base);
1204                self.drain_applies(actions);
1205            }
1206            reply.success = true;
1207            reply.match_index = base;
1208            actions.push(Action::Send {
1209                to: ae.leader,
1210                message: Message::AppendEntriesReply(reply),
1211            });
1212            return Ok(());
1213        }
1214
1215        // Log-consistency check at prev_log_index. `term_at` answers `Some(0)` at
1216        // the index-0 sentinel and `Some(base_term)` at the snapshot boundary, so
1217        // both the head-of-log and post-compaction cases fall out naturally.
1218        let prev_ok = self.log.term_at(ae.prev_log_index) == Some(ae.prev_log_term);
1219        if !prev_ok {
1220            // Supply a conflict hint so the leader can skip back a whole term.
1221            let last = self.log.last_index();
1222            if ae.prev_log_index > last {
1223                reply.conflict_index = last + 1;
1224                reply.conflict_term = 0;
1225            } else {
1226                let conflict_term = self.log.term_at(ae.prev_log_index).unwrap_or(0);
1227                reply.conflict_term = conflict_term;
1228                reply.conflict_index = self.first_index_of_term(conflict_term, ae.prev_log_index);
1229            }
1230            actions.push(Action::Send {
1231                to: ae.leader,
1232                message: Message::AppendEntriesReply(reply),
1233            });
1234            return Ok(());
1235        }
1236
1237        // The logs match up to prev_log_index. Append the new entries, resolving
1238        // any divergent tail, and report how far we now agree.
1239        let (match_index, truncated) = if ae.entries.is_empty() {
1240            (ae.prev_log_index, false)
1241        } else {
1242            self.append_from_leader(&ae.entries)?
1243        };
1244
1245        // A configuration entry in the batch (or a truncation that removed one)
1246        // may have changed the membership we follow under; recompute it.
1247        if truncated || ae.entries.iter().any(|e| e.members().is_some()) {
1248            self.refresh_config(actions);
1249        }
1250
1251        if ae.leader_commit > self.commit_index {
1252            // Never commit past the last entry this RPC actually covers.
1253            self.commit_index = ae.leader_commit.min(match_index);
1254            self.drain_applies(actions);
1255        }
1256
1257        reply.success = true;
1258        reply.match_index = match_index;
1259        actions.push(Action::Send {
1260            to: ae.leader,
1261            message: Message::AppendEntriesReply(reply),
1262        });
1263        Ok(())
1264    }
1265
1266    /// Reconciles the leader's entries into the follower's log.
1267    ///
1268    /// Skips a prefix that already matches (same index and term), truncates the
1269    /// first divergent entry and everything after it, then appends the rest. The
1270    /// protocol guarantees a leader never sends entries that conflict below the
1271    /// commit index, so this never discards committed state. Returns the index of
1272    /// the last entry now stored from this batch and whether a truncation
1273    /// occurred (which the caller uses to know the configuration may have moved).
1274    fn append_from_leader(&mut self, entries: &[LogEntry]) -> Result<(Index, bool)> {
1275        let mut i = 0;
1276        let mut truncated = false;
1277        while i < entries.len() {
1278            let entry = &entries[i];
1279            match self.log.term_at(entry.index) {
1280                Some(term) if term == entry.term => i += 1,
1281                Some(_) => {
1282                    // Divergence: drop the conflicting tail and stop scanning.
1283                    self.log.truncate(entry.index)?;
1284                    truncated = true;
1285                    break;
1286                }
1287                None => break, // beyond our log; append from here on
1288            }
1289        }
1290        if i < entries.len() {
1291            self.log.append(&entries[i..])?;
1292            self.log.sync()?;
1293        }
1294        Ok((entries[entries.len() - 1].index, truncated))
1295    }
1296
1297    fn handle_append_reply(&mut self, reply: AppendEntriesReply, actions: &mut Vec<Action>) {
1298        if self.role != Role::Leader || reply.term != self.current_term {
1299            return; // not leader, or a stale reply from another term
1300        }
1301        let Some(i) = self.progress_index(reply.from) else {
1302            return;
1303        };
1304
1305        if reply.success {
1306            // match_index only ever advances, tolerating reordered duplicates.
1307            if reply.match_index > self.progress[i].match_index {
1308                self.progress[i].match_index = reply.match_index;
1309            }
1310            let want_next = self.progress[i].match_index + 1;
1311            if want_next > self.progress[i].next_index {
1312                self.progress[i].next_index = want_next;
1313            }
1314            self.progress[i].state = ProgressState::Replicate;
1315            self.advance_commit(actions);
1316            // A leadership transfer waits for the target to catch up; once it
1317            // matches the log, tell it to campaign immediately.
1318            self.maybe_send_timeout_now(reply.from, actions);
1319            // The step-down above may have cleared progress; guard the index.
1320            if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
1321                self.send_append(i, actions);
1322            }
1323        } else {
1324            // Rejected: backtrack next_index using the follower's conflict hint,
1325            // drop to Probe, and retry at once.
1326            let next = self.progress[i].next_index;
1327            let matched = self.progress[i].match_index;
1328            self.progress[i].next_index =
1329                self.rejected_next(next, matched, reply.conflict_index, reply.conflict_term);
1330            self.progress[i].state = ProgressState::Probe;
1331            self.send_append(i, actions);
1332        }
1333    }
1334
1335    /// Installs a snapshot shipped by the leader, on a follower too far behind to
1336    /// replicate from the log. The state machine is reset via
1337    /// [`Action::RestoreSnapshot`]; tail replication resumes afterward.
1338    fn handle_install_snapshot(
1339        &mut self,
1340        rpc: InstallSnapshot,
1341        actions: &mut Vec<Action>,
1342    ) -> Result<()> {
1343        if rpc.term < self.current_term {
1344            actions.push(Action::Send {
1345                to: rpc.leader,
1346                message: Message::InstallSnapshotReply(InstallSnapshotReply {
1347                    term: self.current_term,
1348                    from: self.id,
1349                    last_index: 0,
1350                }),
1351            });
1352            return Ok(());
1353        }
1354
1355        // A valid leader for our term: accept its authority.
1356        self.role = Role::Follower;
1357        self.leader_id = Some(rpc.leader);
1358        self.pre_voting = false;
1359        self.reset_election_timer();
1360
1361        let snap_index = rpc.snapshot.index;
1362        let snap_term = rpc.snapshot.term;
1363        // Install only if the snapshot advances us beyond what we already hold: a
1364        // follower that has caught up further via normal replication, or that
1365        // holds a newer snapshot, must not be dragged backwards by a stale or
1366        // reordered `InstallSnapshot`.
1367        if snap_index > self.log.snapshot_index() && snap_index > self.commit_index {
1368            // Adopt the configuration the snapshot carries before installing it,
1369            // so a node catching up this way knows the membership.
1370            if !rpc.snapshot.config.is_empty() {
1371                self.base_config = rpc.snapshot.config.clone();
1372            }
1373            self.log.apply_snapshot(&rpc.snapshot)?;
1374            self.log.sync()?;
1375            self.commit_index = snap_index;
1376            self.last_applied = snap_index;
1377            if snap_index > self.snapshot_hinted_at {
1378                self.snapshot_hinted_at = snap_index;
1379            }
1380            self.refresh_config(actions);
1381            actions.push(Action::RestoreSnapshot {
1382                index: snap_index,
1383                term: snap_term,
1384                data: rpc.snapshot.data,
1385            });
1386        }
1387
1388        // Report how far we now agree: our snapshot boundary, or the snapshot's
1389        // index if we already cover it as committed.
1390        let last_index = self
1391            .log
1392            .snapshot_index()
1393            .max(snap_index.min(self.commit_index));
1394        actions.push(Action::Send {
1395            to: rpc.leader,
1396            message: Message::InstallSnapshotReply(InstallSnapshotReply {
1397                term: self.current_term,
1398                from: self.id,
1399                last_index,
1400            }),
1401        });
1402        Ok(())
1403    }
1404
1405    /// Handles a follower's acknowledgement of an installed snapshot: advance its
1406    /// progress to the snapshot index and resume tail replication.
1407    fn handle_install_snapshot_reply(
1408        &mut self,
1409        reply: InstallSnapshotReply,
1410        actions: &mut Vec<Action>,
1411    ) {
1412        if self.role != Role::Leader || reply.term != self.current_term {
1413            return;
1414        }
1415        let Some(i) = self.progress_index(reply.from) else {
1416            return;
1417        };
1418        if reply.last_index > self.progress[i].match_index {
1419            self.progress[i].match_index = reply.last_index;
1420        }
1421        self.progress[i].next_index = self.progress[i].match_index + 1;
1422        self.progress[i].state = ProgressState::Replicate;
1423        self.advance_commit(actions);
1424        self.maybe_send_timeout_now(reply.from, actions);
1425        if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
1426            self.send_append(i, actions);
1427        }
1428    }
1429
1430    /// Computes the `next_index` to retry after a rejection, using the conflict
1431    /// hint. Prefers to jump just past the leader's last entry of the conflict
1432    /// term; otherwise falls back to the follower's suggested index. The result
1433    /// never rises (a rejection only backtracks) and never drops at or below the
1434    /// confirmed `match_index`, which bounds probing and guarantees it converges.
1435    fn rejected_next(
1436        &self,
1437        current_next: Index,
1438        match_index: Index,
1439        conflict_index: Index,
1440        conflict_term: Term,
1441    ) -> Index {
1442        let floor = match_index + 1;
1443        let mut target = conflict_index.max(1);
1444        if conflict_term > 0 {
1445            if let Some(last) = self.last_index_of_term(conflict_term) {
1446                target = last + 1;
1447            }
1448        }
1449        let ceil = current_next.saturating_sub(1).max(floor);
1450        target.clamp(floor, ceil)
1451    }
1452
1453    /// First index of the contiguous run of `term` ending at `upto`.
1454    fn first_index_of_term(&self, term: Term, upto: Index) -> Index {
1455        let mut i = upto;
1456        while i > 1 && self.log.term_at(i - 1) == Some(term) {
1457            i -= 1;
1458        }
1459        i
1460    }
1461
1462    /// Highest index in the leader's log whose entry has `term`, if any. Relies
1463    /// on terms being non-decreasing down the log to stop early.
1464    fn last_index_of_term(&self, term: Term) -> Option<Index> {
1465        let mut i = self.log.last_index();
1466        while i >= 1 {
1467            match self.log.term_at(i) {
1468                Some(t) if t == term => return Some(i),
1469                Some(t) if t < term => return None,
1470                _ => {}
1471            }
1472            i -= 1;
1473        }
1474        None
1475    }
1476
1477    // ---- membership changes ----------------------------------------------
1478
1479    /// Appends a configuration entry that adds and/or removes a single voter,
1480    /// adopting the new membership immediately.
1481    ///
1482    /// One change at a time: a request made while a previous configuration entry
1483    /// is still uncommitted is rejected with [`Error::ConfigInProgress`]. A no-op
1484    /// change (adding a member already present, removing one absent) succeeds
1485    /// without appending anything.
1486    fn change_membership(
1487        &mut self,
1488        add: Option<NodeId>,
1489        remove: Option<NodeId>,
1490    ) -> Result<Vec<Action>> {
1491        if self.role != Role::Leader || self.transfer_target.is_some() {
1492            return Err(Error::NotLeader {
1493                leader: self.leader_id,
1494            });
1495        }
1496        // The previous configuration change must have committed first.
1497        if self.config_index > self.commit_index {
1498            return Err(Error::ConfigInProgress);
1499        }
1500
1501        let mut members = self.voters.clone();
1502        if let Some(id) = add {
1503            if !members.contains(&id) {
1504                members.push(id);
1505            }
1506        }
1507        if let Some(id) = remove {
1508            members.retain(|&m| m != id);
1509        }
1510        let members = sorted_members(members);
1511        if members == self.voters {
1512            return Ok(Vec::new()); // nothing to do
1513        }
1514
1515        let index = self.log.last_index() + 1;
1516        let entry = LogEntry::config(self.current_term, index, &members);
1517        self.log.append(core::slice::from_ref(&entry))?;
1518        self.log.sync()?;
1519
1520        let mut actions = Vec::new();
1521        // Adopt the new configuration at once (Raft applies a config entry on
1522        // append, not on commit), rebuilding progress and announcing the change.
1523        self.set_config(members, index, &mut actions);
1524        self.replicate_to_all(&mut actions);
1525        self.advance_commit(&mut actions);
1526        Ok(actions)
1527    }
1528
1529    // ---- leadership transfer ---------------------------------------------
1530
1531    /// Begins a leadership transfer to `target`: catch it up, then signal it to
1532    /// campaign. A no-op on a non-leader, for a non-voting target, or when the
1533    /// target is this node.
1534    fn transfer_leadership(&mut self, target: NodeId) -> Result<Vec<Action>> {
1535        if self.role != Role::Leader || target == self.id || !self.voters.contains(&target) {
1536            return Ok(Vec::new());
1537        }
1538        self.transfer_target = Some(target);
1539        let mut actions = Vec::new();
1540        // If the target is already caught up, hand off now; otherwise bring it up
1541        // to date and the catch-up replies will trigger the hand-off.
1542        self.maybe_send_timeout_now(target, &mut actions);
1543        if self.transfer_target.is_some() {
1544            if let Some(i) = self.progress_index(target) {
1545                self.send_append(i, &mut actions);
1546            }
1547        }
1548        Ok(actions)
1549    }
1550
1551    /// Sends a `TimeoutNow` to `target` if a transfer to it is pending and it has
1552    /// caught up to the leader's last log index.
1553    fn maybe_send_timeout_now(&mut self, target: NodeId, actions: &mut Vec<Action>) {
1554        if self.transfer_target != Some(target) {
1555            return;
1556        }
1557        let caught_up = self
1558            .progress_index(target)
1559            .is_some_and(|i| self.progress[i].match_index >= self.log.last_index());
1560        if caught_up {
1561            self.transfer_target = None;
1562            actions.push(Action::Send {
1563                to: target,
1564                message: Message::TimeoutNow(TimeoutNow {
1565                    term: self.current_term,
1566                    leader: self.id,
1567                }),
1568            });
1569        }
1570    }
1571
1572    /// Handles a `TimeoutNow`: a voter starts an election immediately, taking
1573    /// over from a leader that is handing off.
1574    fn handle_timeout_now(&mut self, rpc: TimeoutNow, actions: &mut Vec<Action>) -> Result<()> {
1575        // Ignore a stale signal or one aimed at a node no longer in the cluster.
1576        if rpc.term < self.current_term || !self.is_voter() {
1577            return Ok(());
1578        }
1579        // A forced election: peers honour our vote request despite stickiness.
1580        self.start_election(true, actions)
1581    }
1582
1583    // ---- shared helpers --------------------------------------------------
1584
1585    fn persist_hard_state(&mut self) -> Result<()> {
1586        self.log.set_hard_state(HardState {
1587            term: self.current_term,
1588            voted_for: self.voted_for,
1589        })?;
1590        self.log.sync()
1591    }
1592
1593    fn reset_election_timer(&mut self) {
1594        self.election_elapsed = 0;
1595        self.election_timeout = self
1596            .rng
1597            .gen_range(self.election_timeout_min, self.election_timeout_max);
1598    }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603    // Test setup uses unwrap/expect where a failure cannot be meaningfully
1604    // handled and should fail the test loudly. REPS permits this in test code.
1605    #![allow(clippy::unwrap_used, clippy::expect_used)]
1606
1607    use super::*;
1608
1609    fn drive_to_leader(node: &mut RaftNode) {
1610        for _ in 0..1_000 {
1611            if node.is_leader() {
1612                return;
1613            }
1614            let _ = node.step(Event::Tick).expect("tick");
1615        }
1616        panic!("node never became leader");
1617    }
1618
1619    #[test]
1620    fn test_single_node_elects_itself() {
1621        let mut node = RaftNode::new(RaftConfig::single(1));
1622        drive_to_leader(&mut node);
1623        assert_eq!(node.role(), Role::Leader);
1624        assert_eq!(node.leader(), Some(1));
1625        assert_eq!(node.term(), 1);
1626    }
1627
1628    #[test]
1629    fn test_single_node_commits_proposal() {
1630        let mut node = RaftNode::new(RaftConfig::single(1));
1631        drive_to_leader(&mut node);
1632        let actions = node.step(Event::Propose(b"a".to_vec())).unwrap();
1633        assert_eq!(node.commit_index(), 1);
1634        assert_eq!(node.last_applied(), 1);
1635        let applied: Vec<_> = actions
1636            .iter()
1637            .filter(|a| matches!(a, Action::Apply { .. }))
1638            .collect();
1639        assert_eq!(applied.len(), 1);
1640    }
1641
1642    #[test]
1643    fn test_propose_to_follower_is_rejected() {
1644        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1645        let err = node.step(Event::Propose(b"a".to_vec())).unwrap_err();
1646        assert!(matches!(err, Error::NotLeader { .. }));
1647    }
1648
1649    #[test]
1650    fn test_candidate_pre_votes_then_requests_votes_from_peers() {
1651        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1652        let mut sends = Vec::new();
1653        for _ in 0..1_000 {
1654            let actions = node.step(Event::Tick).unwrap();
1655            if !actions.is_empty() {
1656                sends = actions;
1657                break;
1658            }
1659        }
1660        // A timeout opens a pre-vote round (the node is not yet a candidate, and
1661        // its term has not moved) — PreVotes go to both peers.
1662        assert_eq!(node.role(), Role::Follower);
1663        assert_eq!(node.term(), 0);
1664        let pre_targets: Vec<NodeId> = sends
1665            .iter()
1666            .filter_map(|a| match a {
1667                Action::Send {
1668                    to,
1669                    message: Message::PreVote(_),
1670                } => Some(*to),
1671                _ => None,
1672            })
1673            .collect();
1674        assert_eq!(pre_targets.len(), 2);
1675        assert!(pre_targets.contains(&2) && pre_targets.contains(&3));
1676
1677        // Granting one pre-vote reaches a quorum and starts the real election:
1678        // now the term advances and RequestVotes go out to both peers.
1679        let actions = node
1680            .step(Event::Message(Message::PreVoteReply(PreVoteReply {
1681                term: node.term(),
1682                vote_granted: true,
1683                from: 2,
1684            })))
1685            .unwrap();
1686        assert_eq!(node.role(), Role::Candidate);
1687        assert_eq!(node.term(), 1);
1688        let targets: Vec<NodeId> = actions
1689            .iter()
1690            .filter_map(|a| match a {
1691                Action::Send {
1692                    to,
1693                    message: Message::RequestVote(_),
1694                } => Some(*to),
1695                _ => None,
1696            })
1697            .collect();
1698        assert_eq!(targets.len(), 2);
1699        assert!(targets.contains(&2) && targets.contains(&3));
1700    }
1701
1702    #[test]
1703    fn test_pre_vote_does_not_advance_term_or_persist() {
1704        // Repeated timeouts with no peers reachable must not inflate the term —
1705        // the disruption guard. A lone voter in a 3-node config never gets a
1706        // pre-vote quorum, so it pre-votes forever at term 0.
1707        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(2, 2));
1708        for _ in 0..50 {
1709            let _ = node.step(Event::Tick).unwrap();
1710        }
1711        assert_eq!(node.role(), Role::Follower);
1712        assert_eq!(node.term(), 0);
1713        assert_eq!(node.log().hard_state().term, 0);
1714    }
1715
1716    #[test]
1717    fn test_pre_vote_granted_when_no_leader_and_log_ok() {
1718        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1719        let actions = node
1720            .step(Event::Message(Message::PreVote(PreVote {
1721                term: 1,
1722                candidate: 2,
1723                last_log_index: 0,
1724                last_log_term: 0,
1725            })))
1726            .unwrap();
1727        let granted = actions.iter().any(|a| {
1728            matches!(
1729                a,
1730                Action::Send { message: Message::PreVoteReply(r), .. } if r.vote_granted
1731            )
1732        });
1733        assert!(granted);
1734        // A pre-vote leaves the responder's term and vote untouched.
1735        assert_eq!(node.term(), 0);
1736        assert_eq!(node.log().hard_state().voted_for, None);
1737    }
1738
1739    #[test]
1740    fn test_pre_vote_denied_for_behind_log() {
1741        // Responder holds a term-2 entry; a candidate with an empty log is behind.
1742        let mut log = MemoryLog::new();
1743        log.append(&[entry(2, 1)]).unwrap();
1744        let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
1745        let actions = node
1746            .step(Event::Message(Message::PreVote(PreVote {
1747                term: 1,
1748                candidate: 2,
1749                last_log_index: 0,
1750                last_log_term: 0,
1751            })))
1752            .unwrap();
1753        let granted = actions.iter().any(|a| {
1754            matches!(
1755                a,
1756                Action::Send { message: Message::PreVoteReply(r), .. } if r.vote_granted
1757            )
1758        });
1759        assert!(!granted);
1760    }
1761
1762    #[test]
1763    fn test_node_grants_one_vote_then_refuses_another_candidate() {
1764        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1765        let grant = |node: &mut RaftNode, candidate: NodeId| -> bool {
1766            let actions = node
1767                .step(Event::Message(Message::RequestVote(RequestVote {
1768                    term: 5,
1769                    candidate,
1770                    last_log_index: 0,
1771                    last_log_term: 0,
1772                    force: false,
1773                })))
1774                .unwrap();
1775            actions.iter().any(|a| {
1776                matches!(
1777                    a,
1778                    Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
1779                )
1780            })
1781        };
1782        assert!(grant(&mut node, 2));
1783        assert!(!grant(&mut node, 3)); // already voted for 2 in term 5
1784    }
1785
1786    #[test]
1787    fn test_higher_term_message_steps_node_down() {
1788        let mut node = RaftNode::new(RaftConfig::single(1));
1789        drive_to_leader(&mut node);
1790        let leader_term = node.term();
1791        let _ = node
1792            .step(Event::Message(Message::AppendEntries(AppendEntries {
1793                term: leader_term + 5,
1794                leader: 9,
1795                prev_log_index: 0,
1796                prev_log_term: 0,
1797                entries: Vec::new(),
1798                leader_commit: 0,
1799            })))
1800            .unwrap();
1801        assert_eq!(node.role(), Role::Follower);
1802        assert_eq!(node.term(), leader_term + 5);
1803        assert_eq!(node.leader(), Some(9));
1804    }
1805
1806    #[test]
1807    fn test_stale_term_request_vote_is_refused() {
1808        let mut node = RaftNode::new(RaftConfig::single(1));
1809        drive_to_leader(&mut node); // now in term 1+
1810        let term = node.term();
1811        let actions = node
1812            .step(Event::Message(Message::RequestVote(RequestVote {
1813                term: term - 1,
1814                candidate: 2,
1815                last_log_index: 99,
1816                last_log_term: 99,
1817                force: false,
1818            })))
1819            .unwrap();
1820        let granted = actions.iter().any(|a| {
1821            matches!(
1822                a,
1823                Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
1824            )
1825        });
1826        assert!(!granted);
1827    }
1828
1829    #[test]
1830    fn test_heartbeat_resets_follower_election_timer() {
1831        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5));
1832        // Tick a few times, then a heartbeat should keep it a follower.
1833        let _ = node.step(Event::Tick).unwrap();
1834        let _ = node.step(Event::Tick).unwrap();
1835        let _ = node
1836            .step(Event::Message(Message::AppendEntries(AppendEntries {
1837                term: 1,
1838                leader: 2,
1839                prev_log_index: 0,
1840                prev_log_term: 0,
1841                entries: Vec::new(),
1842                leader_commit: 0,
1843            })))
1844            .unwrap();
1845        assert_eq!(node.role(), Role::Follower);
1846        assert_eq!(node.leader(), Some(2));
1847        // It needs the full timeout again from zero; a single tick must not trip it.
1848        let _ = node.step(Event::Tick).unwrap();
1849        assert_eq!(node.role(), Role::Follower);
1850    }
1851
1852    /// Elects node 1 leader of a `{1,2,3}` cluster: tick until it opens a
1853    /// pre-vote round, grant the pre-vote from node 2 (which starts the real
1854    /// election), then grant the real vote from node 2 (self + 1 = quorum of 2).
1855    fn elect_multi_node_leader() -> RaftNode {
1856        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_heartbeat_interval(2));
1857        for _ in 0..1_000 {
1858            let actions = node.step(Event::Tick).expect("tick");
1859            if !actions.is_empty() {
1860                break; // opened a pre-vote round and sent PreVotes
1861            }
1862        }
1863        // Pre-vote does not advance the term; the reply carries the responder's.
1864        let _ = node
1865            .step(Event::Message(Message::PreVoteReply(PreVoteReply {
1866                term: node.term(),
1867                vote_granted: true,
1868                from: 2,
1869            })))
1870            .expect("pre-vote reply");
1871        assert_eq!(node.role(), Role::Candidate);
1872        let term = node.term();
1873        let _ = node
1874            .step(Event::Message(Message::RequestVoteReply(
1875                RequestVoteReply {
1876                    term,
1877                    vote_granted: true,
1878                    from: 2,
1879                },
1880            )))
1881            .expect("vote reply");
1882        assert_eq!(node.role(), Role::Leader);
1883        node
1884    }
1885
1886    #[test]
1887    fn test_vote_replies_elect_a_multi_node_leader() {
1888        let node = elect_multi_node_leader();
1889        assert_eq!(node.leader(), Some(1));
1890    }
1891
1892    #[test]
1893    fn test_leader_emits_heartbeats_on_interval() {
1894        let mut node = elect_multi_node_leader();
1895        // First post-election tick: no heartbeat yet (interval 2).
1896        let first = node.step(Event::Tick).unwrap();
1897        assert!(first.is_empty());
1898        let second = node.step(Event::Tick).unwrap();
1899        let heartbeats = second
1900            .iter()
1901            .filter(|a| {
1902                matches!(
1903                    a,
1904                    Action::Send {
1905                        message: Message::AppendEntries(_),
1906                        ..
1907                    }
1908                )
1909            })
1910            .count();
1911        assert_eq!(heartbeats, 2);
1912    }
1913
1914    #[test]
1915    fn test_persisted_hard_state_is_restored() {
1916        let mut log = MemoryLog::new();
1917        log.set_hard_state(HardState {
1918            term: 7,
1919            voted_for: Some(3),
1920        })
1921        .unwrap();
1922        let node = RaftNode::with_log(RaftConfig::single(1), log);
1923        assert_eq!(node.term(), 7);
1924    }
1925
1926    #[test]
1927    fn test_vote_is_persisted_to_log() {
1928        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1929        let _ = node
1930            .step(Event::Message(Message::RequestVote(RequestVote {
1931                term: 4,
1932                candidate: 2,
1933                last_log_index: 0,
1934                last_log_term: 0,
1935                force: false,
1936            })))
1937            .unwrap();
1938        assert_eq!(
1939            node.log().hard_state(),
1940            HardState {
1941                term: 4,
1942                voted_for: Some(2)
1943            }
1944        );
1945    }
1946
1947    // ---- v0.3 replication --------------------------------------------------
1948
1949    fn entry(term: Term, index: Index) -> LogEntry {
1950        LogEntry::new(term, index, vec![index as u8])
1951    }
1952
1953    fn first_append_entries(actions: &[Action], to: NodeId) -> AppendEntries {
1954        actions
1955            .iter()
1956            .find_map(|a| match a {
1957                Action::Send {
1958                    to: dst,
1959                    message: Message::AppendEntries(ae),
1960                } if *dst == to => Some(ae.clone()),
1961                _ => None,
1962            })
1963            .expect("an AppendEntries to the peer")
1964    }
1965
1966    /// Walks a `{1,2,3}` leader through replicating a proposal to follower 2 and
1967    /// confirms commit lands once a quorum (leader + one follower) holds it.
1968    #[test]
1969    fn test_leader_replicates_and_commits_on_quorum() {
1970        let mut node = elect_multi_node_leader();
1971        // Bring follower 2 into Replicate state with an accepted heartbeat.
1972        let _ = node
1973            .step(Event::Message(Message::AppendEntriesReply(
1974                AppendEntriesReply {
1975                    term: node.term(),
1976                    success: true,
1977                    from: 2,
1978                    match_index: 0,
1979                    conflict_index: 0,
1980                    conflict_term: 0,
1981                },
1982            )))
1983            .unwrap();
1984
1985        // Propose: the entry streams to follower 2 but is not yet committed.
1986        let actions = node.step(Event::Propose(b"x".to_vec())).unwrap();
1987        assert_eq!(node.commit_index(), 0);
1988        let ae = first_append_entries(&actions, 2);
1989        assert_eq!(ae.entries.len(), 1);
1990        assert_eq!(ae.entries[0].index, 1);
1991
1992        // Follower 2 acknowledges index 1: quorum reached, entry commits/applies.
1993        let applied = node
1994            .step(Event::Message(Message::AppendEntriesReply(
1995                AppendEntriesReply {
1996                    term: node.term(),
1997                    success: true,
1998                    from: 2,
1999                    match_index: 1,
2000                    conflict_index: 0,
2001                    conflict_term: 0,
2002                },
2003            )))
2004            .unwrap();
2005        assert_eq!(node.commit_index(), 1);
2006        assert!(
2007            applied
2008                .iter()
2009                .any(|a| matches!(a, Action::Apply { index: 1, .. }))
2010        );
2011    }
2012
2013    #[test]
2014    fn test_follower_appends_streamed_entries() {
2015        let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2016        let actions = node
2017            .step(Event::Message(Message::AppendEntries(AppendEntries {
2018                term: 2,
2019                leader: 1,
2020                prev_log_index: 0,
2021                prev_log_term: 0,
2022                entries: vec![entry(2, 1), entry(2, 2)],
2023                leader_commit: 2,
2024            })))
2025            .unwrap();
2026        assert_eq!(node.log().last_index(), 2);
2027        assert_eq!(node.commit_index(), 2);
2028        let reply = actions
2029            .iter()
2030            .find_map(|a| match a {
2031                Action::Send {
2032                    message: Message::AppendEntriesReply(r),
2033                    ..
2034                } => Some(r.clone()),
2035                _ => None,
2036            })
2037            .expect("a reply");
2038        assert!(reply.success);
2039        assert_eq!(reply.match_index, 2);
2040    }
2041
2042    #[test]
2043    fn test_follower_truncates_divergent_tail() {
2044        // Follower already holds [t1@1, t2@2]; leader overwrites index 2 with t3.
2045        let mut log = MemoryLog::new();
2046        log.append(&[entry(1, 1), entry(2, 2)]).unwrap();
2047        let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
2048
2049        let actions = node
2050            .step(Event::Message(Message::AppendEntries(AppendEntries {
2051                term: 3,
2052                leader: 1,
2053                prev_log_index: 1,
2054                prev_log_term: 1,
2055                entries: vec![entry(3, 2)],
2056                leader_commit: 0,
2057            })))
2058            .unwrap();
2059        assert_eq!(node.log().last_index(), 2);
2060        assert_eq!(node.log().entry(2).unwrap().term, 3);
2061        let reply = first_reply(&actions);
2062        assert!(reply.success);
2063        assert_eq!(reply.match_index, 2);
2064    }
2065
2066    #[test]
2067    fn test_follower_rejects_short_log_with_length_hint() {
2068        let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2069        let actions = node
2070            .step(Event::Message(Message::AppendEntries(AppendEntries {
2071                term: 2,
2072                leader: 1,
2073                prev_log_index: 3,
2074                prev_log_term: 1,
2075                entries: vec![entry(2, 4)],
2076                leader_commit: 0,
2077            })))
2078            .unwrap();
2079        let reply = first_reply(&actions);
2080        assert!(!reply.success);
2081        assert_eq!(reply.conflict_index, 1); // empty log => probe from index 1
2082        assert_eq!(reply.conflict_term, 0);
2083    }
2084
2085    #[test]
2086    fn test_follower_rejects_term_mismatch_with_term_hint() {
2087        // Follower holds three term-1 entries; leader probes with a wrong term.
2088        let mut log = MemoryLog::new();
2089        log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2090            .unwrap();
2091        let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
2092
2093        let actions = node
2094            .step(Event::Message(Message::AppendEntries(AppendEntries {
2095                term: 5,
2096                leader: 1,
2097                prev_log_index: 3,
2098                prev_log_term: 4, // follower has term 1 there
2099                entries: Vec::new(),
2100                leader_commit: 0,
2101            })))
2102            .unwrap();
2103        let reply = first_reply(&actions);
2104        assert!(!reply.success);
2105        assert_eq!(reply.conflict_term, 1);
2106        assert_eq!(reply.conflict_index, 1); // first index of the term-1 run
2107    }
2108
2109    #[test]
2110    fn test_rejection_backtracks_then_converges() {
2111        // Leader 1 has [t1@1, t1@2, t1@3] and a fresh follower 2 that is empty.
2112        let mut log = MemoryLog::new();
2113        log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2114            .unwrap();
2115        log.set_hard_state(HardState {
2116            term: 1,
2117            voted_for: Some(1),
2118        })
2119        .unwrap();
2120        let mut leader =
2121            RaftNode::with_log(RaftConfig::new(1, [2]).with_election_timeout(5, 5), log);
2122        let mut follower = RaftNode::new(RaftConfig::new(2, [1]));
2123
2124        // Elect leader 1 (2-node quorum is 2). Tick to a pre-vote round, grant the
2125        // pre-vote from 2 to start the real election, then grant the real vote.
2126        let mut pending = Vec::new();
2127        for _ in 0..50 {
2128            let acts = leader.step(Event::Tick).unwrap();
2129            if !acts.is_empty() {
2130                pending = acts;
2131                break;
2132            }
2133        }
2134        let _ = leader
2135            .step(Event::Message(Message::PreVoteReply(PreVoteReply {
2136                term: leader.term(),
2137                vote_granted: true,
2138                from: 2,
2139            })))
2140            .unwrap();
2141        // The candidate's term is now 2; grant it.
2142        let _ = leader
2143            .step(Event::Message(Message::RequestVoteReply(
2144                RequestVoteReply {
2145                    term: leader.term(),
2146                    vote_granted: true,
2147                    from: 2,
2148                },
2149            )))
2150            .unwrap();
2151        assert!(leader.is_leader());
2152        let _ = pending;
2153
2154        // Pump messages between the two until the follower catches up.
2155        let mut queue: Vec<(NodeId, Message)> = drain_sends(&mut leader);
2156        for _ in 0..100 {
2157            if follower.log().last_index() == 3 {
2158                break;
2159            }
2160            let mut next = Vec::new();
2161            for (to, msg) in queue.drain(..) {
2162                let acts = if to == 2 {
2163                    follower.step(Event::Message(msg)).unwrap()
2164                } else {
2165                    leader.step(Event::Message(msg)).unwrap()
2166                };
2167                next.extend(collect_sends(acts));
2168            }
2169            queue = next;
2170            if queue.is_empty() {
2171                queue = leader
2172                    .step(Event::Tick)
2173                    .unwrap()
2174                    .into_iter()
2175                    .filter_map(send_pair)
2176                    .collect();
2177            }
2178        }
2179        assert_eq!(follower.log().last_index(), 3);
2180        assert_eq!(follower.log().entry(3).unwrap().term, 1);
2181    }
2182
2183    fn first_reply(actions: &[Action]) -> AppendEntriesReply {
2184        actions
2185            .iter()
2186            .find_map(|a| match a {
2187                Action::Send {
2188                    message: Message::AppendEntriesReply(r),
2189                    ..
2190                } => Some(r.clone()),
2191                _ => None,
2192            })
2193            .expect("an AppendEntriesReply")
2194    }
2195
2196    fn send_pair(a: Action) -> Option<(NodeId, Message)> {
2197        match a {
2198            Action::Send { to, message } => Some((to, message)),
2199            _ => None,
2200        }
2201    }
2202
2203    fn collect_sends(actions: Vec<Action>) -> Vec<(NodeId, Message)> {
2204        actions.into_iter().filter_map(send_pair).collect()
2205    }
2206
2207    fn drain_sends(node: &mut RaftNode) -> Vec<(NodeId, Message)> {
2208        let acts = node.step(Event::Tick).unwrap();
2209        collect_sends(acts)
2210    }
2211
2212    // ---- durability contract ----------------------------------------------
2213
2214    /// A [`RaftLog`] wrapper that counts [`sync`](RaftLog::sync) calls, to prove
2215    /// the node makes state durable before it replies.
2216    #[derive(Default)]
2217    struct SyncCountLog {
2218        inner: MemoryLog,
2219        syncs: std::cell::Cell<u32>,
2220    }
2221
2222    impl SyncCountLog {
2223        fn syncs(&self) -> u32 {
2224            self.syncs.get()
2225        }
2226    }
2227
2228    impl RaftLog for SyncCountLog {
2229        fn last_index(&self) -> Index {
2230            self.inner.last_index()
2231        }
2232        fn last_term(&self) -> Term {
2233            self.inner.last_term()
2234        }
2235        fn term_at(&self, index: Index) -> Option<Term> {
2236            self.inner.term_at(index)
2237        }
2238        fn entry(&self, index: Index) -> Option<LogEntry> {
2239            self.inner.entry(index)
2240        }
2241        fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
2242            self.inner.append(entries)
2243        }
2244        fn truncate(&mut self, from: Index) -> Result<()> {
2245            self.inner.truncate(from)
2246        }
2247        fn hard_state(&self) -> HardState {
2248            self.inner.hard_state()
2249        }
2250        fn set_hard_state(&mut self, state: HardState) -> Result<()> {
2251            self.inner.set_hard_state(state)
2252        }
2253        fn sync(&mut self) -> Result<()> {
2254            self.syncs.set(self.syncs.get() + 1);
2255            self.inner.sync()
2256        }
2257    }
2258
2259    #[test]
2260    fn test_granting_a_vote_persists_and_syncs_before_replying() {
2261        let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), SyncCountLog::default());
2262        let actions = node
2263            .step(Event::Message(Message::RequestVote(RequestVote {
2264                term: 4,
2265                candidate: 2,
2266                last_log_index: 0,
2267                last_log_term: 0,
2268                force: false,
2269            })))
2270            .unwrap();
2271        // The grant was produced...
2272        assert!(actions.iter().any(|a| matches!(
2273            a,
2274            Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
2275        )));
2276        // ...and the vote was durably synced as part of handling it.
2277        assert!(
2278            node.log().syncs() >= 1,
2279            "vote must be synced before the reply"
2280        );
2281        assert_eq!(node.log().hard_state().voted_for, Some(2));
2282    }
2283
2284    // ---- v0.5 snapshots ---------------------------------------------------
2285
2286    #[test]
2287    fn test_snapshot_hint_then_compaction() {
2288        // Single-node leader with a low threshold snapshots its own log.
2289        let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(2));
2290        drive_to_leader(&mut node);
2291
2292        let mut hint = None;
2293        for _ in 0..4 {
2294            let actions = node.step(Event::Propose(b"c".to_vec())).unwrap();
2295            if let Some(Action::Snapshot { index, term }) = actions
2296                .iter()
2297                .find(|a| matches!(a, Action::Snapshot { .. }))
2298                .cloned()
2299            {
2300                hint = Some((index, term));
2301                break;
2302            }
2303        }
2304        let (index, _term) = hint.expect("a snapshot hint once the log grew");
2305        assert!(index >= 2);
2306
2307        // Feed the snapshot back; the log compacts up to `index`.
2308        let _ = node
2309            .step(Event::Snapshot {
2310                index,
2311                data: b"state".to_vec(),
2312            })
2313            .unwrap();
2314        assert_eq!(node.log().snapshot_index(), index);
2315        assert_eq!(node.log().entry(1), None); // compacted away
2316        assert_eq!(node.commit_index(), node.commit_index()); // unchanged
2317    }
2318
2319    #[test]
2320    fn test_snapshot_event_rejects_uncommitted_index() {
2321        let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(0));
2322        drive_to_leader(&mut node);
2323        let _ = node.step(Event::Propose(b"c".to_vec())).unwrap(); // commit index 1
2324        // An index beyond what is committed/applied is ignored, no compaction.
2325        let _ = node
2326            .step(Event::Snapshot {
2327                index: 99,
2328                data: vec![],
2329            })
2330            .unwrap();
2331        assert_eq!(node.log().snapshot_index(), 0);
2332    }
2333
2334    #[test]
2335    fn test_leader_sends_install_snapshot_when_follower_is_behind() {
2336        // Leader 1 of {1,2,3} with a compacted log: a probe to a fresh follower
2337        // (next = 1 <= snapshot index) must be an InstallSnapshot, not an append.
2338        let mut log = MemoryLog::new();
2339        log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2340            .unwrap();
2341        log.apply_snapshot(&Snapshot::new(2, 1, b"snap".to_vec()))
2342            .unwrap();
2343        log.set_hard_state(HardState {
2344            term: 1,
2345            voted_for: Some(1),
2346        })
2347        .unwrap();
2348        let mut node =
2349            RaftNode::with_log(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5), log);
2350        // Drive an election and win it. Grant the pre-vote (starts the real
2351        // election) and then the real vote; each is ignored unless it applies.
2352        let mut elected = false;
2353        for _ in 0..50 {
2354            let _ = node.step(Event::Tick).unwrap();
2355            let _ = node
2356                .step(Event::Message(Message::PreVoteReply(PreVoteReply {
2357                    term: node.term(),
2358                    vote_granted: true,
2359                    from: 2,
2360                })))
2361                .unwrap();
2362            if node.role() == Role::Candidate {
2363                let _ = node
2364                    .step(Event::Message(Message::RequestVoteReply(
2365                        RequestVoteReply {
2366                            term: node.term(),
2367                            vote_granted: true,
2368                            from: 2,
2369                        },
2370                    )))
2371                    .unwrap();
2372            }
2373            if node.is_leader() {
2374                elected = true;
2375                break;
2376            }
2377        }
2378        assert!(elected);
2379        // A heartbeat round: peers start at next = last+1 = 4. Force a backtrack by
2380        // rejecting from node 2 down into the compacted range.
2381        let actions = node
2382            .step(Event::Message(Message::AppendEntriesReply(
2383                AppendEntriesReply {
2384                    term: node.term(),
2385                    success: false,
2386                    from: 2,
2387                    match_index: 0,
2388                    conflict_index: 1, // wants to go back to index 1 (compacted)
2389                    conflict_term: 0,
2390                },
2391            )))
2392            .unwrap();
2393        // Backtracking past the snapshot boundary yields an InstallSnapshot.
2394        assert!(actions.iter().any(|a| matches!(
2395            a,
2396            Action::Send {
2397                to: 2,
2398                message: Message::InstallSnapshot(_)
2399            }
2400        )));
2401    }
2402
2403    #[test]
2404    fn test_follower_installs_snapshot_and_restores() {
2405        let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2406        let actions = node
2407            .step(Event::Message(Message::InstallSnapshot(InstallSnapshot {
2408                term: 3,
2409                leader: 1,
2410                snapshot: Snapshot::new(8, 2, b"the state".to_vec()),
2411            })))
2412            .unwrap();
2413        assert_eq!(node.log().snapshot_index(), 8);
2414        assert_eq!(node.commit_index(), 8);
2415        // The follower asks the app to restore, and acknowledges the install.
2416        assert!(
2417            actions
2418                .iter()
2419                .any(|a| matches!(a, Action::RestoreSnapshot { index: 8, .. }))
2420        );
2421        assert!(actions.iter().any(|a| matches!(
2422            a,
2423            Action::Send { message: Message::InstallSnapshotReply(r), .. } if r.last_index == 8
2424        )));
2425    }
2426
2427    #[test]
2428    fn test_node_recovers_applied_position_from_snapshot() {
2429        // A log opened with an existing snapshot starts applied at the boundary,
2430        // so the application (which restores from the snapshot) is not re-fed it.
2431        let mut log = MemoryLog::new();
2432        log.apply_snapshot(&Snapshot::new(6, 2, b"s".to_vec()))
2433            .unwrap();
2434        let node = RaftNode::with_log(RaftConfig::single(1), log);
2435        assert_eq!(node.commit_index(), 6);
2436        assert_eq!(node.last_applied(), 6);
2437    }
2438
2439    #[test]
2440    fn test_rejected_vote_makes_no_durable_write() {
2441        // Node already at term 5 having voted; a stale lower-term request changes
2442        // nothing and must not force a sync.
2443        let mut log = SyncCountLog::default();
2444        log.set_hard_state(HardState {
2445            term: 5,
2446            voted_for: Some(9),
2447        })
2448        .unwrap();
2449        let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
2450        let before = node.log().syncs();
2451        let _ = node
2452            .step(Event::Message(Message::RequestVote(RequestVote {
2453                term: 3, // stale
2454                candidate: 2,
2455                last_log_index: 0,
2456                last_log_term: 0,
2457                force: false,
2458            })))
2459            .unwrap();
2460        assert_eq!(node.log().syncs(), before, "a no-op vote must not sync");
2461    }
2462
2463    // ---- v0.6 membership changes ------------------------------------------
2464
2465    fn membership_changed(actions: &[Action]) -> Option<Vec<NodeId>> {
2466        actions.iter().find_map(|a| match a {
2467            Action::MembershipChanged { members } => Some(members.clone()),
2468            _ => None,
2469        })
2470    }
2471
2472    #[test]
2473    fn test_node_reports_bootstrap_membership() {
2474        let node = RaftNode::new(RaftConfig::new(1, [3, 2]));
2475        assert_eq!(node.members(), &[1, 2, 3]); // sorted, includes self
2476    }
2477
2478    #[test]
2479    fn test_add_server_adopts_config_immediately() {
2480        let mut node = RaftNode::new(RaftConfig::single(1));
2481        drive_to_leader(&mut node);
2482        let actions = node.step(Event::AddServer(2)).unwrap();
2483        assert_eq!(node.members(), &[1, 2]);
2484        assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
2485        // The change is a configuration log entry, not an applied command.
2486        let last = node.log().last_index();
2487        assert_eq!(node.log().entry(last).unwrap().members(), Some(vec![1, 2]));
2488    }
2489
2490    #[test]
2491    fn test_remove_server_adopts_config() {
2492        let mut node = elect_multi_node_leader(); // leader 1 of {1,2,3}
2493        let actions = node.step(Event::RemoveServer(3)).unwrap();
2494        assert_eq!(node.members(), &[1, 2]);
2495        assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
2496    }
2497
2498    #[test]
2499    fn test_add_existing_member_is_noop() {
2500        let mut node = elect_multi_node_leader();
2501        let actions = node.step(Event::AddServer(2)).unwrap();
2502        assert!(actions.is_empty());
2503        assert_eq!(node.members(), &[1, 2, 3]);
2504    }
2505
2506    #[test]
2507    fn test_one_config_change_at_a_time() {
2508        // Single-node leader: adding node 2 makes the config entry uncommittable
2509        // alone (quorum becomes 2), so a second change is rejected until it lands.
2510        let mut node = RaftNode::new(RaftConfig::single(1));
2511        drive_to_leader(&mut node);
2512        let _ = node.step(Event::AddServer(2)).unwrap();
2513        let err = node.step(Event::AddServer(3)).unwrap_err();
2514        assert!(matches!(err, Error::ConfigInProgress));
2515    }
2516
2517    #[test]
2518    fn test_change_membership_rejected_on_follower() {
2519        let mut node = RaftNode::new(RaftConfig::new(2, [1, 3]));
2520        let err = node.step(Event::AddServer(4)).unwrap_err();
2521        assert!(matches!(err, Error::NotLeader { .. }));
2522    }
2523
2524    #[test]
2525    fn test_membership_recovered_from_config_entry() {
2526        // A log whose latest entry is a configuration change restores that config.
2527        let mut log = MemoryLog::new();
2528        log.append(&[
2529            LogEntry::new(1, 1, b"x".to_vec()),
2530            LogEntry::config(1, 2, &[1, 2, 3, 4]),
2531        ])
2532        .unwrap();
2533        let node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
2534        assert_eq!(node.members(), &[1, 2, 3, 4]);
2535    }
2536
2537    #[test]
2538    fn test_membership_recovered_from_snapshot_config() {
2539        let mut log = MemoryLog::new();
2540        log.apply_snapshot(&Snapshot::with_config(
2541            5,
2542            2,
2543            vec![1, 2, 3, 4, 5],
2544            b"s".to_vec(),
2545        ))
2546        .unwrap();
2547        let node = RaftNode::with_log(RaftConfig::single(1), log);
2548        assert_eq!(node.members(), &[1, 2, 3, 4, 5]);
2549    }
2550
2551    #[test]
2552    fn test_follower_adopts_config_from_append() {
2553        let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2554        let actions = node
2555            .step(Event::Message(Message::AppendEntries(AppendEntries {
2556                term: 2,
2557                leader: 1,
2558                prev_log_index: 0,
2559                prev_log_term: 0,
2560                entries: vec![LogEntry::config(2, 1, &[1, 5, 9])],
2561                leader_commit: 0,
2562            })))
2563            .unwrap();
2564        assert_eq!(node.members(), &[1, 5, 9]);
2565        assert_eq!(membership_changed(&actions), Some(vec![1, 5, 9]));
2566    }
2567
2568    // ---- v0.6 leadership transfer -----------------------------------------
2569
2570    #[test]
2571    fn test_timeout_now_triggers_immediate_election() {
2572        let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(1000, 1000));
2573        // Far from its election timeout, yet TimeoutNow makes it campaign at once.
2574        let actions = node
2575            .step(Event::Message(Message::TimeoutNow(TimeoutNow {
2576                term: 0,
2577                leader: 2,
2578            })))
2579            .unwrap();
2580        assert_eq!(node.role(), Role::Candidate);
2581        assert!(actions.iter().any(|a| matches!(
2582            a,
2583            Action::Send {
2584                message: Message::RequestVote(_),
2585                ..
2586            }
2587        )));
2588    }
2589
2590    #[test]
2591    fn test_transfer_to_caught_up_follower_sends_timeout_now() {
2592        let mut node = elect_multi_node_leader(); // leader 1 of {1,2,3}
2593        // Follower 2 acknowledges the leader's (empty) log, so it is caught up.
2594        let _ = node
2595            .step(Event::Message(Message::AppendEntriesReply(
2596                AppendEntriesReply {
2597                    term: node.term(),
2598                    success: true,
2599                    from: 2,
2600                    match_index: 0,
2601                    conflict_index: 0,
2602                    conflict_term: 0,
2603                },
2604            )))
2605            .unwrap();
2606        let actions = node.step(Event::TransferLeadership(2)).unwrap();
2607        assert!(actions.iter().any(|a| matches!(
2608            a,
2609            Action::Send {
2610                to: 2,
2611                message: Message::TimeoutNow(_)
2612            }
2613        )));
2614    }
2615
2616    #[test]
2617    fn test_transfer_to_non_voter_is_noop() {
2618        let mut node = elect_multi_node_leader();
2619        let actions = node.step(Event::TransferLeadership(99)).unwrap();
2620        assert!(actions.is_empty());
2621    }
2622
2623    #[test]
2624    fn test_non_voter_does_not_start_election() {
2625        // A node not in its own configuration follows but never campaigns.
2626        let mut log = MemoryLog::new();
2627        log.append(&[LogEntry::config(1, 1, &[1, 2, 3])]).unwrap(); // self (5) excluded
2628        let mut node = RaftNode::with_log(
2629            RaftConfig::new(5, [1, 2, 3]).with_election_timeout(2, 2),
2630            log,
2631        );
2632        assert_eq!(node.members(), &[1, 2, 3]);
2633        for _ in 0..50 {
2634            let _ = node.step(Event::Tick).unwrap();
2635        }
2636        assert_eq!(node.role(), Role::Follower);
2637    }
2638}