1use crate::config::RaftConfig;
31use crate::error::{Error, Result};
32use crate::log::{MemoryLog, RaftLog};
33use crate::message::{
34 AppendEntries, AppendEntriesReply, InstallSnapshot, InstallSnapshotReply, Message, PreVote,
35 PreVoteReply, RequestVote, RequestVoteReply, TimeoutNow,
36};
37use crate::rng::Rng;
38use crate::types::{HardState, Index, LogEntry, NodeId, Role, Snapshot, Term};
39
40fn sorted_members(ids: impl IntoIterator<Item = NodeId>) -> Vec<NodeId> {
43 let mut v: Vec<NodeId> = ids.into_iter().collect();
44 v.sort_unstable();
45 v.dedup();
46 v
47}
48
49pub enum Event {
67 Tick,
69 Message(Message),
71 Propose(Vec<u8>),
76 Snapshot {
82 index: Index,
84 data: Vec<u8>,
86 },
87 AddServer(NodeId),
94 RemoveServer(NodeId),
99 TransferLeadership(NodeId),
105}
106
107#[non_exhaustive]
136#[derive(Clone, Debug, PartialEq, Eq)]
137pub enum Action {
138 Send {
140 to: NodeId,
142 message: Message,
144 },
145 Apply {
150 index: Index,
152 term: Term,
154 command: Vec<u8>,
156 },
157 Snapshot {
165 index: Index,
167 term: Term,
169 },
170 RestoreSnapshot {
178 index: Index,
180 term: Term,
182 data: Vec<u8>,
184 },
185 MembershipChanged {
193 members: Vec<NodeId>,
195 },
196}
197
198#[derive(Clone, Copy, Debug, PartialEq, Eq)]
207enum ProgressState {
208 Probe,
209 Replicate,
210}
211
212#[derive(Clone, Copy, Debug)]
214struct Progress {
215 id: NodeId,
217 next_index: Index,
219 match_index: Index,
221 state: ProgressState,
223}
224
225pub struct RaftNode<L: RaftLog = MemoryLog> {
245 id: NodeId,
246 voters: Vec<NodeId>,
250 base_config: Vec<NodeId>,
254 config_index: Index,
257 election_timeout_min: u32,
258 election_timeout_max: u32,
259 heartbeat_interval: u32,
260 max_batch: usize,
261 snapshot_threshold: usize,
262
263 log: L,
264 role: Role,
265 current_term: Term,
266 voted_for: Option<NodeId>,
267 leader_id: Option<NodeId>,
268 commit_index: Index,
269 last_applied: Index,
270
271 election_elapsed: u32,
272 heartbeat_elapsed: u32,
273 election_timeout: u32,
274 votes: Vec<NodeId>,
275 pre_voting: bool,
279 pre_votes: Vec<NodeId>,
281 progress: Vec<Progress>,
284 snapshot_hinted_at: Index,
287 transfer_target: Option<NodeId>,
290 rng: Rng,
291}
292
293impl RaftNode<MemoryLog> {
294 #[must_use]
309 pub fn new(config: RaftConfig) -> Self {
310 Self::with_log(config, MemoryLog::new())
311 }
312}
313
314impl<L: RaftLog> RaftNode<L> {
315 #[must_use]
332 pub fn with_log(config: RaftConfig, log: L) -> Self {
333 let hard = log.hard_state();
334 let base = log.snapshot_index();
338
339 let bootstrap = sorted_members(config.peers.iter().copied().chain([config.id]));
342 let base_config = match log.snapshot() {
343 Some(s) if !s.config.is_empty() => s.config,
344 _ => bootstrap,
345 };
346 let mut voters = base_config.clone();
349 let mut config_index = 0;
350 let mut i = log.last_index();
351 while i > base {
352 if let Some(members) = log.entry(i).and_then(|e| e.members()) {
353 voters = members;
354 config_index = i;
355 break;
356 }
357 i -= 1;
358 }
359
360 let mut rng = Rng::new(config.seed);
361 let election_timeout =
362 rng.gen_range(config.election_timeout_min, config.election_timeout_max);
363 Self {
364 id: config.id,
365 voters,
366 base_config,
367 config_index,
368 election_timeout_min: config.election_timeout_min,
369 election_timeout_max: config.election_timeout_max,
370 heartbeat_interval: config.heartbeat_interval,
371 max_batch: config.max_batch,
372 snapshot_threshold: config.snapshot_threshold,
373 log,
374 role: Role::Follower,
375 current_term: hard.term,
376 voted_for: hard.voted_for,
377 leader_id: None,
378 commit_index: base,
379 last_applied: base,
380 election_elapsed: 0,
381 heartbeat_elapsed: 0,
382 election_timeout,
383 votes: Vec::new(),
384 pre_voting: false,
385 pre_votes: Vec::new(),
386 progress: Vec::new(),
387 snapshot_hinted_at: base,
388 transfer_target: None,
389 rng,
390 }
391 }
392
393 #[inline]
397 #[must_use]
398 pub fn id(&self) -> NodeId {
399 self.id
400 }
401
402 #[inline]
404 #[must_use]
405 pub fn role(&self) -> Role {
406 self.role
407 }
408
409 #[inline]
423 #[must_use]
424 pub fn is_leader(&self) -> bool {
425 self.role == Role::Leader
426 }
427
428 #[inline]
430 #[must_use]
431 pub fn term(&self) -> Term {
432 self.current_term
433 }
434
435 #[inline]
437 #[must_use]
438 pub fn leader(&self) -> Option<NodeId> {
439 self.leader_id
440 }
441
442 #[inline]
444 #[must_use]
445 pub fn commit_index(&self) -> Index {
446 self.commit_index
447 }
448
449 #[inline]
451 #[must_use]
452 pub fn last_applied(&self) -> Index {
453 self.last_applied
454 }
455
456 #[inline]
467 #[must_use]
468 pub fn log(&self) -> &L {
469 &self.log
470 }
471
472 #[inline]
486 #[must_use]
487 pub fn members(&self) -> &[NodeId] {
488 &self.voters
489 }
490
491 #[inline]
496 fn quorum(&self) -> usize {
497 self.voters.len() / 2 + 1
498 }
499
500 #[inline]
502 fn is_voter(&self) -> bool {
503 self.voters.contains(&self.id)
504 }
505
506 fn set_config(&mut self, voters: Vec<NodeId>, config_index: Index, actions: &mut Vec<Action>) {
511 let changed = voters != self.voters;
512 self.voters = voters;
513 self.config_index = config_index;
514 if self.role == Role::Leader {
515 self.rebuild_progress();
516 }
517 if changed {
518 actions.push(Action::MembershipChanged {
519 members: self.voters.clone(),
520 });
521 }
522 }
523
524 fn refresh_config(&mut self, actions: &mut Vec<Action>) {
528 let base = self.log.snapshot_index();
529 let mut voters = self.base_config.clone();
530 let mut config_index = 0;
531 let mut i = self.log.last_index();
532 while i > base {
533 if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
534 voters = members;
535 config_index = i;
536 break;
537 }
538 i -= 1;
539 }
540 self.set_config(voters, config_index, actions);
541 }
542
543 fn rebuild_progress(&mut self) {
546 let next = self.log.last_index() + 1;
547 let old = core::mem::take(&mut self.progress);
548 self.progress = self
549 .voters
550 .iter()
551 .filter(|&&id| id != self.id)
552 .map(|&id| {
553 old.iter()
554 .find(|p| p.id == id)
555 .copied()
556 .unwrap_or(Progress {
557 id,
558 next_index: next,
559 match_index: 0,
560 state: ProgressState::Probe,
561 })
562 })
563 .collect();
564 }
565
566 fn progress_index(&self, id: NodeId) -> Option<usize> {
568 self.progress.iter().position(|p| p.id == id)
569 }
570
571 pub fn step(&mut self, event: Event) -> Result<Vec<Action>> {
602 match event {
603 Event::Tick => self.tick(),
604 Event::Message(message) => self.handle_message(message),
605 Event::Propose(command) => self.propose(command),
606 Event::Snapshot { index, data } => self.handle_snapshot_event(index, data),
607 Event::AddServer(id) => self.change_membership(Some(id), None),
608 Event::RemoveServer(id) => self.change_membership(None, Some(id)),
609 Event::TransferLeadership(target) => self.transfer_leadership(target),
610 }
611 }
612
613 fn tick(&mut self) -> Result<Vec<Action>> {
616 let mut actions = Vec::new();
617 match self.role {
618 Role::Follower | Role::Candidate => {
619 self.election_elapsed += 1;
620 if self.election_elapsed >= self.election_timeout && self.is_voter() {
625 self.start_pre_vote(&mut actions)?;
626 }
627 }
628 Role::Leader => {
629 self.heartbeat_elapsed += 1;
630 if self.heartbeat_elapsed >= self.heartbeat_interval {
631 self.heartbeat_elapsed = 0;
632 self.replicate_to_all(&mut actions);
633 }
634 }
635 }
636 Ok(actions)
637 }
638
639 fn start_pre_vote(&mut self, actions: &mut Vec<Action>) -> Result<()> {
649 self.role = Role::Follower;
650 self.leader_id = None;
651 self.pre_voting = true;
652 self.pre_votes.clear();
653 self.pre_votes.push(self.id);
654 self.reset_election_timer();
655
656 if self.pre_votes.len() >= self.quorum() {
659 return self.start_election(false, actions);
660 }
661
662 let last_log_index = self.log.last_index();
663 let last_log_term = self.log.last_term();
664 let term = self.current_term + 1; let id = self.id;
666 for &peer in &self.voters {
667 if peer == id {
668 continue;
669 }
670 actions.push(Action::Send {
671 to: peer,
672 message: Message::PreVote(PreVote {
673 term,
674 candidate: id,
675 last_log_index,
676 last_log_term,
677 }),
678 });
679 }
680 Ok(())
681 }
682
683 fn start_election(&mut self, force: bool, actions: &mut Vec<Action>) -> Result<()> {
684 self.role = Role::Candidate;
685 self.current_term += 1;
686 self.voted_for = Some(self.id);
687 self.leader_id = None;
688 self.transfer_target = None;
689 self.pre_voting = false;
690 self.pre_votes.clear();
691 self.progress.clear();
692 self.votes.clear();
693 self.votes.push(self.id);
694 self.reset_election_timer();
695 self.persist_hard_state()?;
696
697 if self.votes.len() >= self.quorum() {
700 self.become_leader(actions);
701 return Ok(());
702 }
703
704 let last_log_index = self.log.last_index();
705 let last_log_term = self.log.last_term();
706 let term = self.current_term;
707 let id = self.id;
708 for &peer in &self.voters {
709 if peer == id {
710 continue;
711 }
712 actions.push(Action::Send {
713 to: peer,
714 message: Message::RequestVote(RequestVote {
715 term,
716 candidate: id,
717 last_log_index,
718 last_log_term,
719 force,
720 }),
721 });
722 }
723 Ok(())
724 }
725
726 fn become_leader(&mut self, actions: &mut Vec<Action>) {
727 self.role = Role::Leader;
728 self.leader_id = Some(self.id);
729 self.heartbeat_elapsed = 0;
730 self.transfer_target = None;
731 self.pre_voting = false;
732 self.pre_votes.clear();
733 self.rebuild_progress();
737 self.replicate_to_all(actions);
740 self.advance_commit(actions);
741 }
742
743 fn replicate_to_all(&mut self, actions: &mut Vec<Action>) {
747 for i in 0..self.progress.len() {
748 self.send_append(i, actions);
749 }
750 }
751
752 fn replicate_to_streaming(&mut self, actions: &mut Vec<Action>) {
756 for i in 0..self.progress.len() {
757 if self.progress[i].state == ProgressState::Replicate {
758 self.send_append(i, actions);
759 }
760 }
761 }
762
763 fn send_append(&mut self, i: usize, actions: &mut Vec<Action>) {
768 let next = self.progress[i].next_index;
769 if next <= self.log.snapshot_index() {
772 self.send_snapshot(i, actions);
773 return;
774 }
775
776 let peer = self.progress[i].id;
777 let state = self.progress[i].state;
778 let prev_log_index = next - 1;
779 let prev_log_term = self.log.term_at(prev_log_index).unwrap_or(0);
780
781 let last = self.log.last_index();
782 let entries = if last >= next {
783 let to = last.min(next + self.max_batch as Index - 1);
784 self.log.entries(next, to)
785 } else {
786 Vec::new()
787 };
788 let count = entries.len() as Index;
789
790 actions.push(Action::Send {
791 to: peer,
792 message: Message::AppendEntries(AppendEntries {
793 term: self.current_term,
794 leader: self.id,
795 prev_log_index,
796 prev_log_term,
797 entries,
798 leader_commit: self.commit_index,
799 }),
800 });
801
802 if count > 0 && state == ProgressState::Replicate {
803 self.progress[i].next_index = next + count;
804 }
805 }
806
807 fn send_snapshot(&mut self, i: usize, actions: &mut Vec<Action>) {
811 if let Some(snapshot) = self.log.snapshot() {
812 self.progress[i].state = ProgressState::Probe;
813 actions.push(Action::Send {
814 to: self.progress[i].id,
815 message: Message::InstallSnapshot(InstallSnapshot {
816 term: self.current_term,
817 leader: self.id,
818 snapshot,
819 }),
820 });
821 }
822 }
823
824 fn propose(&mut self, command: Vec<u8>) -> Result<Vec<Action>> {
827 if self.role != Role::Leader || self.transfer_target.is_some() {
828 return Err(Error::NotLeader {
829 leader: self.leader_id,
830 });
831 }
832 let index = self.log.last_index() + 1;
833 let entry = LogEntry::new(self.current_term, index, command);
834 self.log.append(core::slice::from_ref(&entry))?;
835 self.log.sync()?;
836
837 let mut actions = Vec::new();
838 self.replicate_to_streaming(&mut actions);
841 self.advance_commit(&mut actions);
842 Ok(actions)
843 }
844
845 fn advance_commit(&mut self, actions: &mut Vec<Action>) {
854 let last = self.log.last_index();
855 let quorum = self.quorum();
856 let leader_holds = usize::from(self.is_voter());
860 let mut new_commit = self.commit_index;
861 let mut n = last;
862 while n > self.commit_index {
863 match self.log.term_at(n) {
864 Some(term) if term == self.current_term => {
865 let mut replicas = leader_holds;
866 for p in &self.progress {
867 if p.match_index >= n {
868 replicas += 1;
869 }
870 }
871 if replicas >= quorum {
872 new_commit = n;
873 break; }
875 }
876 Some(term) if term < self.current_term => break,
879 _ => {}
880 }
881 n -= 1;
882 }
883 if new_commit > self.commit_index {
884 self.commit_index = new_commit;
885 self.drain_applies(actions);
886 if self.role == Role::Leader
888 && !self.is_voter()
889 && self.config_index != 0
890 && self.commit_index >= self.config_index
891 {
892 self.step_down_to_follower();
893 }
894 }
895 }
896
897 fn step_down_to_follower(&mut self) {
900 self.role = Role::Follower;
901 self.leader_id = None;
902 self.transfer_target = None;
903 self.pre_voting = false;
904 self.pre_votes.clear();
905 self.progress.clear();
906 self.votes.clear();
907 }
908
909 fn drain_applies(&mut self, actions: &mut Vec<Action>) {
910 while self.last_applied < self.commit_index {
911 self.last_applied += 1;
912 if let Some(entry) = self.log.entry(self.last_applied) {
916 if entry.members().is_none() {
917 actions.push(Action::Apply {
918 index: entry.index,
919 term: entry.term,
920 command: entry.command,
921 });
922 }
923 }
924 }
925 self.maybe_hint_snapshot(actions);
926 }
927
928 fn maybe_hint_snapshot(&mut self, actions: &mut Vec<Action>) {
931 if self.snapshot_threshold == 0 {
932 return;
933 }
934 let base = self.log.snapshot_index();
935 let grown = self.last_applied.saturating_sub(base) as usize;
936 if grown >= self.snapshot_threshold && self.last_applied > self.snapshot_hinted_at {
937 if let Some(term) = self.log.term_at(self.last_applied) {
938 self.snapshot_hinted_at = self.last_applied;
939 actions.push(Action::Snapshot {
940 index: self.last_applied,
941 term,
942 });
943 }
944 }
945 }
946
947 fn handle_snapshot_event(&mut self, index: Index, data: Vec<u8>) -> Result<Vec<Action>> {
956 if index > self.commit_index
957 || index > self.last_applied
958 || index <= self.log.snapshot_index()
959 {
960 return Ok(Vec::new());
961 }
962 let Some(term) = self.log.term_at(index) else {
963 return Ok(Vec::new());
964 };
965 let config = self.config_at(index);
969 self.base_config = config.clone();
970 self.log
971 .apply_snapshot(&Snapshot::with_config(index, term, config, data))?;
972 self.log.sync()?;
973 if self.snapshot_hinted_at < index {
974 self.snapshot_hinted_at = index;
975 }
976 let mut actions = Vec::new();
977 self.refresh_config(&mut actions);
978 Ok(actions)
979 }
980
981 fn config_at(&self, index: Index) -> Vec<NodeId> {
984 let base = self.log.snapshot_index();
985 let mut i = index.min(self.log.last_index());
986 while i > base {
987 if let Some(members) = self.log.entry(i).and_then(|e| e.members()) {
988 return members;
989 }
990 i -= 1;
991 }
992 self.base_config.clone()
993 }
994
995 fn handle_message(&mut self, message: Message) -> Result<Vec<Action>> {
998 if matches!(message, Message::RequestVote(ref rv) if !rv.force)
1005 && self.leader_id.is_some()
1006 && self.election_elapsed < self.election_timeout_min
1007 {
1008 return Ok(Vec::new());
1009 }
1010
1011 let mut actions = Vec::new();
1012
1013 let message = match message {
1018 Message::PreVote(pv) => {
1019 self.handle_pre_vote(pv, &mut actions);
1020 return Ok(actions);
1021 }
1022 Message::PreVoteReply(reply) => {
1023 self.handle_pre_vote_reply(reply, &mut actions)?;
1024 return Ok(actions);
1025 }
1026 other => other,
1027 };
1028
1029 if message.term() > self.current_term {
1032 self.become_follower(message.term(), None)?;
1033 }
1034
1035 match message {
1036 Message::RequestVote(rv) => self.handle_request_vote(rv, &mut actions)?,
1037 Message::RequestVoteReply(reply) => self.handle_vote_reply(reply, &mut actions),
1038 Message::AppendEntries(ae) => self.handle_append_entries(ae, &mut actions)?,
1039 Message::AppendEntriesReply(reply) => self.handle_append_reply(reply, &mut actions),
1040 Message::InstallSnapshot(rpc) => self.handle_install_snapshot(rpc, &mut actions)?,
1041 Message::InstallSnapshotReply(reply) => {
1042 self.handle_install_snapshot_reply(reply, &mut actions);
1043 }
1044 Message::TimeoutNow(rpc) => self.handle_timeout_now(rpc, &mut actions)?,
1045 Message::PreVote(_) | Message::PreVoteReply(_) => {}
1047 }
1048 Ok(actions)
1049 }
1050
1051 fn become_follower(&mut self, term: Term, leader: Option<NodeId>) -> Result<()> {
1052 let hard_state_changed = term > self.current_term;
1053 self.role = Role::Follower;
1054 if term > self.current_term {
1055 self.current_term = term;
1056 self.voted_for = None;
1057 }
1058 self.leader_id = leader;
1059 self.transfer_target = None;
1060 self.pre_voting = false;
1061 self.pre_votes.clear();
1062 self.votes.clear();
1063 self.progress.clear();
1064 if hard_state_changed {
1065 self.persist_hard_state()?;
1066 }
1067 Ok(())
1068 }
1069
1070 fn handle_request_vote(&mut self, rv: RequestVote, actions: &mut Vec<Action>) -> Result<()> {
1071 let mut granted = false;
1072 if rv.term >= self.current_term {
1073 let can_vote = match self.voted_for {
1074 None => true,
1075 Some(c) => c == rv.candidate,
1076 };
1077 let log_ok = self.candidate_log_up_to_date(rv.last_log_term, rv.last_log_index);
1078 if can_vote && log_ok {
1079 granted = true;
1080 self.voted_for = Some(rv.candidate);
1081 self.persist_hard_state()?;
1082 self.reset_election_timer();
1083 }
1084 }
1085 actions.push(Action::Send {
1086 to: rv.candidate,
1087 message: Message::RequestVoteReply(RequestVoteReply {
1088 term: self.current_term,
1089 vote_granted: granted,
1090 from: self.id,
1091 }),
1092 });
1093 Ok(())
1094 }
1095
1096 fn candidate_log_up_to_date(&self, cand_last_term: Term, cand_last_index: Index) -> bool {
1100 let my_term = self.log.last_term();
1101 let my_index = self.log.last_index();
1102 cand_last_term > my_term || (cand_last_term == my_term && cand_last_index >= my_index)
1103 }
1104
1105 fn handle_vote_reply(&mut self, reply: RequestVoteReply, actions: &mut Vec<Action>) {
1106 if self.role != Role::Candidate || reply.term != self.current_term {
1107 return;
1108 }
1109 if reply.vote_granted && !self.votes.contains(&reply.from) {
1110 self.votes.push(reply.from);
1111 if self.votes.len() >= self.quorum() {
1112 self.become_leader(actions);
1113 }
1114 }
1115 }
1116
1117 fn handle_pre_vote(&mut self, pv: PreVote, actions: &mut Vec<Action>) {
1124 let have_active_leader =
1125 self.leader_id.is_some() && self.election_elapsed < self.election_timeout_min;
1126 let granted = pv.term >= self.current_term
1127 && !have_active_leader
1128 && self.candidate_log_up_to_date(pv.last_log_term, pv.last_log_index);
1129 actions.push(Action::Send {
1130 to: pv.candidate,
1131 message: Message::PreVoteReply(PreVoteReply {
1132 term: self.current_term,
1133 vote_granted: granted,
1134 from: self.id,
1135 }),
1136 });
1137 }
1138
1139 fn handle_pre_vote_reply(
1144 &mut self,
1145 reply: PreVoteReply,
1146 actions: &mut Vec<Action>,
1147 ) -> Result<()> {
1148 if !self.pre_voting {
1149 return Ok(());
1150 }
1151 if reply.term > self.current_term {
1152 self.pre_voting = false;
1153 self.pre_votes.clear();
1154 return self.become_follower(reply.term, None);
1155 }
1156 if reply.vote_granted && !self.pre_votes.contains(&reply.from) {
1157 self.pre_votes.push(reply.from);
1158 if self.pre_votes.len() >= self.quorum() {
1159 self.start_election(false, actions)?;
1160 }
1161 }
1162 Ok(())
1163 }
1164
1165 fn handle_append_entries(
1166 &mut self,
1167 ae: AppendEntries,
1168 actions: &mut Vec<Action>,
1169 ) -> Result<()> {
1170 let mut reply = AppendEntriesReply {
1171 term: self.current_term,
1172 success: false,
1173 from: self.id,
1174 match_index: 0,
1175 conflict_index: 0,
1176 conflict_term: 0,
1177 };
1178
1179 if ae.term < self.current_term {
1181 actions.push(Action::Send {
1182 to: ae.leader,
1183 message: Message::AppendEntriesReply(reply),
1184 });
1185 return Ok(());
1186 }
1187
1188 self.role = Role::Follower;
1191 self.leader_id = Some(ae.leader);
1192 self.pre_voting = false;
1193 self.reset_election_timer();
1194
1195 let base = self.log.snapshot_index();
1201 if ae.prev_log_index < base {
1202 if ae.leader_commit > self.commit_index {
1203 self.commit_index = ae.leader_commit.min(base);
1204 self.drain_applies(actions);
1205 }
1206 reply.success = true;
1207 reply.match_index = base;
1208 actions.push(Action::Send {
1209 to: ae.leader,
1210 message: Message::AppendEntriesReply(reply),
1211 });
1212 return Ok(());
1213 }
1214
1215 let prev_ok = self.log.term_at(ae.prev_log_index) == Some(ae.prev_log_term);
1219 if !prev_ok {
1220 let last = self.log.last_index();
1222 if ae.prev_log_index > last {
1223 reply.conflict_index = last + 1;
1224 reply.conflict_term = 0;
1225 } else {
1226 let conflict_term = self.log.term_at(ae.prev_log_index).unwrap_or(0);
1227 reply.conflict_term = conflict_term;
1228 reply.conflict_index = self.first_index_of_term(conflict_term, ae.prev_log_index);
1229 }
1230 actions.push(Action::Send {
1231 to: ae.leader,
1232 message: Message::AppendEntriesReply(reply),
1233 });
1234 return Ok(());
1235 }
1236
1237 let (match_index, truncated) = if ae.entries.is_empty() {
1240 (ae.prev_log_index, false)
1241 } else {
1242 self.append_from_leader(&ae.entries)?
1243 };
1244
1245 if truncated || ae.entries.iter().any(|e| e.members().is_some()) {
1248 self.refresh_config(actions);
1249 }
1250
1251 if ae.leader_commit > self.commit_index {
1252 self.commit_index = ae.leader_commit.min(match_index);
1254 self.drain_applies(actions);
1255 }
1256
1257 reply.success = true;
1258 reply.match_index = match_index;
1259 actions.push(Action::Send {
1260 to: ae.leader,
1261 message: Message::AppendEntriesReply(reply),
1262 });
1263 Ok(())
1264 }
1265
1266 fn append_from_leader(&mut self, entries: &[LogEntry]) -> Result<(Index, bool)> {
1275 let mut i = 0;
1276 let mut truncated = false;
1277 while i < entries.len() {
1278 let entry = &entries[i];
1279 match self.log.term_at(entry.index) {
1280 Some(term) if term == entry.term => i += 1,
1281 Some(_) => {
1282 self.log.truncate(entry.index)?;
1284 truncated = true;
1285 break;
1286 }
1287 None => break, }
1289 }
1290 if i < entries.len() {
1291 self.log.append(&entries[i..])?;
1292 self.log.sync()?;
1293 }
1294 Ok((entries[entries.len() - 1].index, truncated))
1295 }
1296
1297 fn handle_append_reply(&mut self, reply: AppendEntriesReply, actions: &mut Vec<Action>) {
1298 if self.role != Role::Leader || reply.term != self.current_term {
1299 return; }
1301 let Some(i) = self.progress_index(reply.from) else {
1302 return;
1303 };
1304
1305 if reply.success {
1306 if reply.match_index > self.progress[i].match_index {
1308 self.progress[i].match_index = reply.match_index;
1309 }
1310 let want_next = self.progress[i].match_index + 1;
1311 if want_next > self.progress[i].next_index {
1312 self.progress[i].next_index = want_next;
1313 }
1314 self.progress[i].state = ProgressState::Replicate;
1315 self.advance_commit(actions);
1316 self.maybe_send_timeout_now(reply.from, actions);
1319 if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
1321 self.send_append(i, actions);
1322 }
1323 } else {
1324 let next = self.progress[i].next_index;
1327 let matched = self.progress[i].match_index;
1328 self.progress[i].next_index =
1329 self.rejected_next(next, matched, reply.conflict_index, reply.conflict_term);
1330 self.progress[i].state = ProgressState::Probe;
1331 self.send_append(i, actions);
1332 }
1333 }
1334
1335 fn handle_install_snapshot(
1339 &mut self,
1340 rpc: InstallSnapshot,
1341 actions: &mut Vec<Action>,
1342 ) -> Result<()> {
1343 if rpc.term < self.current_term {
1344 actions.push(Action::Send {
1345 to: rpc.leader,
1346 message: Message::InstallSnapshotReply(InstallSnapshotReply {
1347 term: self.current_term,
1348 from: self.id,
1349 last_index: 0,
1350 }),
1351 });
1352 return Ok(());
1353 }
1354
1355 self.role = Role::Follower;
1357 self.leader_id = Some(rpc.leader);
1358 self.pre_voting = false;
1359 self.reset_election_timer();
1360
1361 let snap_index = rpc.snapshot.index;
1362 let snap_term = rpc.snapshot.term;
1363 if snap_index > self.log.snapshot_index() && snap_index > self.commit_index {
1368 if !rpc.snapshot.config.is_empty() {
1371 self.base_config = rpc.snapshot.config.clone();
1372 }
1373 self.log.apply_snapshot(&rpc.snapshot)?;
1374 self.log.sync()?;
1375 self.commit_index = snap_index;
1376 self.last_applied = snap_index;
1377 if snap_index > self.snapshot_hinted_at {
1378 self.snapshot_hinted_at = snap_index;
1379 }
1380 self.refresh_config(actions);
1381 actions.push(Action::RestoreSnapshot {
1382 index: snap_index,
1383 term: snap_term,
1384 data: rpc.snapshot.data,
1385 });
1386 }
1387
1388 let last_index = self
1391 .log
1392 .snapshot_index()
1393 .max(snap_index.min(self.commit_index));
1394 actions.push(Action::Send {
1395 to: rpc.leader,
1396 message: Message::InstallSnapshotReply(InstallSnapshotReply {
1397 term: self.current_term,
1398 from: self.id,
1399 last_index,
1400 }),
1401 });
1402 Ok(())
1403 }
1404
1405 fn handle_install_snapshot_reply(
1408 &mut self,
1409 reply: InstallSnapshotReply,
1410 actions: &mut Vec<Action>,
1411 ) {
1412 if self.role != Role::Leader || reply.term != self.current_term {
1413 return;
1414 }
1415 let Some(i) = self.progress_index(reply.from) else {
1416 return;
1417 };
1418 if reply.last_index > self.progress[i].match_index {
1419 self.progress[i].match_index = reply.last_index;
1420 }
1421 self.progress[i].next_index = self.progress[i].match_index + 1;
1422 self.progress[i].state = ProgressState::Replicate;
1423 self.advance_commit(actions);
1424 self.maybe_send_timeout_now(reply.from, actions);
1425 if self.role == Role::Leader && self.progress[i].next_index <= self.log.last_index() {
1426 self.send_append(i, actions);
1427 }
1428 }
1429
1430 fn rejected_next(
1436 &self,
1437 current_next: Index,
1438 match_index: Index,
1439 conflict_index: Index,
1440 conflict_term: Term,
1441 ) -> Index {
1442 let floor = match_index + 1;
1443 let mut target = conflict_index.max(1);
1444 if conflict_term > 0 {
1445 if let Some(last) = self.last_index_of_term(conflict_term) {
1446 target = last + 1;
1447 }
1448 }
1449 let ceil = current_next.saturating_sub(1).max(floor);
1450 target.clamp(floor, ceil)
1451 }
1452
1453 fn first_index_of_term(&self, term: Term, upto: Index) -> Index {
1455 let mut i = upto;
1456 while i > 1 && self.log.term_at(i - 1) == Some(term) {
1457 i -= 1;
1458 }
1459 i
1460 }
1461
1462 fn last_index_of_term(&self, term: Term) -> Option<Index> {
1465 let mut i = self.log.last_index();
1466 while i >= 1 {
1467 match self.log.term_at(i) {
1468 Some(t) if t == term => return Some(i),
1469 Some(t) if t < term => return None,
1470 _ => {}
1471 }
1472 i -= 1;
1473 }
1474 None
1475 }
1476
1477 fn change_membership(
1487 &mut self,
1488 add: Option<NodeId>,
1489 remove: Option<NodeId>,
1490 ) -> Result<Vec<Action>> {
1491 if self.role != Role::Leader || self.transfer_target.is_some() {
1492 return Err(Error::NotLeader {
1493 leader: self.leader_id,
1494 });
1495 }
1496 if self.config_index > self.commit_index {
1498 return Err(Error::ConfigInProgress);
1499 }
1500
1501 let mut members = self.voters.clone();
1502 if let Some(id) = add {
1503 if !members.contains(&id) {
1504 members.push(id);
1505 }
1506 }
1507 if let Some(id) = remove {
1508 members.retain(|&m| m != id);
1509 }
1510 let members = sorted_members(members);
1511 if members == self.voters {
1512 return Ok(Vec::new()); }
1514
1515 let index = self.log.last_index() + 1;
1516 let entry = LogEntry::config(self.current_term, index, &members);
1517 self.log.append(core::slice::from_ref(&entry))?;
1518 self.log.sync()?;
1519
1520 let mut actions = Vec::new();
1521 self.set_config(members, index, &mut actions);
1524 self.replicate_to_all(&mut actions);
1525 self.advance_commit(&mut actions);
1526 Ok(actions)
1527 }
1528
1529 fn transfer_leadership(&mut self, target: NodeId) -> Result<Vec<Action>> {
1535 if self.role != Role::Leader || target == self.id || !self.voters.contains(&target) {
1536 return Ok(Vec::new());
1537 }
1538 self.transfer_target = Some(target);
1539 let mut actions = Vec::new();
1540 self.maybe_send_timeout_now(target, &mut actions);
1543 if self.transfer_target.is_some() {
1544 if let Some(i) = self.progress_index(target) {
1545 self.send_append(i, &mut actions);
1546 }
1547 }
1548 Ok(actions)
1549 }
1550
1551 fn maybe_send_timeout_now(&mut self, target: NodeId, actions: &mut Vec<Action>) {
1554 if self.transfer_target != Some(target) {
1555 return;
1556 }
1557 let caught_up = self
1558 .progress_index(target)
1559 .is_some_and(|i| self.progress[i].match_index >= self.log.last_index());
1560 if caught_up {
1561 self.transfer_target = None;
1562 actions.push(Action::Send {
1563 to: target,
1564 message: Message::TimeoutNow(TimeoutNow {
1565 term: self.current_term,
1566 leader: self.id,
1567 }),
1568 });
1569 }
1570 }
1571
1572 fn handle_timeout_now(&mut self, rpc: TimeoutNow, actions: &mut Vec<Action>) -> Result<()> {
1575 if rpc.term < self.current_term || !self.is_voter() {
1577 return Ok(());
1578 }
1579 self.start_election(true, actions)
1581 }
1582
1583 fn persist_hard_state(&mut self) -> Result<()> {
1586 self.log.set_hard_state(HardState {
1587 term: self.current_term,
1588 voted_for: self.voted_for,
1589 })?;
1590 self.log.sync()
1591 }
1592
1593 fn reset_election_timer(&mut self) {
1594 self.election_elapsed = 0;
1595 self.election_timeout = self
1596 .rng
1597 .gen_range(self.election_timeout_min, self.election_timeout_max);
1598 }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603 #![allow(clippy::unwrap_used, clippy::expect_used)]
1606
1607 use super::*;
1608
1609 fn drive_to_leader(node: &mut RaftNode) {
1610 for _ in 0..1_000 {
1611 if node.is_leader() {
1612 return;
1613 }
1614 let _ = node.step(Event::Tick).expect("tick");
1615 }
1616 panic!("node never became leader");
1617 }
1618
1619 #[test]
1620 fn test_single_node_elects_itself() {
1621 let mut node = RaftNode::new(RaftConfig::single(1));
1622 drive_to_leader(&mut node);
1623 assert_eq!(node.role(), Role::Leader);
1624 assert_eq!(node.leader(), Some(1));
1625 assert_eq!(node.term(), 1);
1626 }
1627
1628 #[test]
1629 fn test_single_node_commits_proposal() {
1630 let mut node = RaftNode::new(RaftConfig::single(1));
1631 drive_to_leader(&mut node);
1632 let actions = node.step(Event::Propose(b"a".to_vec())).unwrap();
1633 assert_eq!(node.commit_index(), 1);
1634 assert_eq!(node.last_applied(), 1);
1635 let applied: Vec<_> = actions
1636 .iter()
1637 .filter(|a| matches!(a, Action::Apply { .. }))
1638 .collect();
1639 assert_eq!(applied.len(), 1);
1640 }
1641
1642 #[test]
1643 fn test_propose_to_follower_is_rejected() {
1644 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1645 let err = node.step(Event::Propose(b"a".to_vec())).unwrap_err();
1646 assert!(matches!(err, Error::NotLeader { .. }));
1647 }
1648
1649 #[test]
1650 fn test_candidate_pre_votes_then_requests_votes_from_peers() {
1651 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1652 let mut sends = Vec::new();
1653 for _ in 0..1_000 {
1654 let actions = node.step(Event::Tick).unwrap();
1655 if !actions.is_empty() {
1656 sends = actions;
1657 break;
1658 }
1659 }
1660 assert_eq!(node.role(), Role::Follower);
1663 assert_eq!(node.term(), 0);
1664 let pre_targets: Vec<NodeId> = sends
1665 .iter()
1666 .filter_map(|a| match a {
1667 Action::Send {
1668 to,
1669 message: Message::PreVote(_),
1670 } => Some(*to),
1671 _ => None,
1672 })
1673 .collect();
1674 assert_eq!(pre_targets.len(), 2);
1675 assert!(pre_targets.contains(&2) && pre_targets.contains(&3));
1676
1677 let actions = node
1680 .step(Event::Message(Message::PreVoteReply(PreVoteReply {
1681 term: node.term(),
1682 vote_granted: true,
1683 from: 2,
1684 })))
1685 .unwrap();
1686 assert_eq!(node.role(), Role::Candidate);
1687 assert_eq!(node.term(), 1);
1688 let targets: Vec<NodeId> = actions
1689 .iter()
1690 .filter_map(|a| match a {
1691 Action::Send {
1692 to,
1693 message: Message::RequestVote(_),
1694 } => Some(*to),
1695 _ => None,
1696 })
1697 .collect();
1698 assert_eq!(targets.len(), 2);
1699 assert!(targets.contains(&2) && targets.contains(&3));
1700 }
1701
1702 #[test]
1703 fn test_pre_vote_does_not_advance_term_or_persist() {
1704 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(2, 2));
1708 for _ in 0..50 {
1709 let _ = node.step(Event::Tick).unwrap();
1710 }
1711 assert_eq!(node.role(), Role::Follower);
1712 assert_eq!(node.term(), 0);
1713 assert_eq!(node.log().hard_state().term, 0);
1714 }
1715
1716 #[test]
1717 fn test_pre_vote_granted_when_no_leader_and_log_ok() {
1718 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1719 let actions = node
1720 .step(Event::Message(Message::PreVote(PreVote {
1721 term: 1,
1722 candidate: 2,
1723 last_log_index: 0,
1724 last_log_term: 0,
1725 })))
1726 .unwrap();
1727 let granted = actions.iter().any(|a| {
1728 matches!(
1729 a,
1730 Action::Send { message: Message::PreVoteReply(r), .. } if r.vote_granted
1731 )
1732 });
1733 assert!(granted);
1734 assert_eq!(node.term(), 0);
1736 assert_eq!(node.log().hard_state().voted_for, None);
1737 }
1738
1739 #[test]
1740 fn test_pre_vote_denied_for_behind_log() {
1741 let mut log = MemoryLog::new();
1743 log.append(&[entry(2, 1)]).unwrap();
1744 let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
1745 let actions = node
1746 .step(Event::Message(Message::PreVote(PreVote {
1747 term: 1,
1748 candidate: 2,
1749 last_log_index: 0,
1750 last_log_term: 0,
1751 })))
1752 .unwrap();
1753 let granted = actions.iter().any(|a| {
1754 matches!(
1755 a,
1756 Action::Send { message: Message::PreVoteReply(r), .. } if r.vote_granted
1757 )
1758 });
1759 assert!(!granted);
1760 }
1761
1762 #[test]
1763 fn test_node_grants_one_vote_then_refuses_another_candidate() {
1764 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1765 let grant = |node: &mut RaftNode, candidate: NodeId| -> bool {
1766 let actions = node
1767 .step(Event::Message(Message::RequestVote(RequestVote {
1768 term: 5,
1769 candidate,
1770 last_log_index: 0,
1771 last_log_term: 0,
1772 force: false,
1773 })))
1774 .unwrap();
1775 actions.iter().any(|a| {
1776 matches!(
1777 a,
1778 Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
1779 )
1780 })
1781 };
1782 assert!(grant(&mut node, 2));
1783 assert!(!grant(&mut node, 3)); }
1785
1786 #[test]
1787 fn test_higher_term_message_steps_node_down() {
1788 let mut node = RaftNode::new(RaftConfig::single(1));
1789 drive_to_leader(&mut node);
1790 let leader_term = node.term();
1791 let _ = node
1792 .step(Event::Message(Message::AppendEntries(AppendEntries {
1793 term: leader_term + 5,
1794 leader: 9,
1795 prev_log_index: 0,
1796 prev_log_term: 0,
1797 entries: Vec::new(),
1798 leader_commit: 0,
1799 })))
1800 .unwrap();
1801 assert_eq!(node.role(), Role::Follower);
1802 assert_eq!(node.term(), leader_term + 5);
1803 assert_eq!(node.leader(), Some(9));
1804 }
1805
1806 #[test]
1807 fn test_stale_term_request_vote_is_refused() {
1808 let mut node = RaftNode::new(RaftConfig::single(1));
1809 drive_to_leader(&mut node); let term = node.term();
1811 let actions = node
1812 .step(Event::Message(Message::RequestVote(RequestVote {
1813 term: term - 1,
1814 candidate: 2,
1815 last_log_index: 99,
1816 last_log_term: 99,
1817 force: false,
1818 })))
1819 .unwrap();
1820 let granted = actions.iter().any(|a| {
1821 matches!(
1822 a,
1823 Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
1824 )
1825 });
1826 assert!(!granted);
1827 }
1828
1829 #[test]
1830 fn test_heartbeat_resets_follower_election_timer() {
1831 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5));
1832 let _ = node.step(Event::Tick).unwrap();
1834 let _ = node.step(Event::Tick).unwrap();
1835 let _ = node
1836 .step(Event::Message(Message::AppendEntries(AppendEntries {
1837 term: 1,
1838 leader: 2,
1839 prev_log_index: 0,
1840 prev_log_term: 0,
1841 entries: Vec::new(),
1842 leader_commit: 0,
1843 })))
1844 .unwrap();
1845 assert_eq!(node.role(), Role::Follower);
1846 assert_eq!(node.leader(), Some(2));
1847 let _ = node.step(Event::Tick).unwrap();
1849 assert_eq!(node.role(), Role::Follower);
1850 }
1851
1852 fn elect_multi_node_leader() -> RaftNode {
1856 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_heartbeat_interval(2));
1857 for _ in 0..1_000 {
1858 let actions = node.step(Event::Tick).expect("tick");
1859 if !actions.is_empty() {
1860 break; }
1862 }
1863 let _ = node
1865 .step(Event::Message(Message::PreVoteReply(PreVoteReply {
1866 term: node.term(),
1867 vote_granted: true,
1868 from: 2,
1869 })))
1870 .expect("pre-vote reply");
1871 assert_eq!(node.role(), Role::Candidate);
1872 let term = node.term();
1873 let _ = node
1874 .step(Event::Message(Message::RequestVoteReply(
1875 RequestVoteReply {
1876 term,
1877 vote_granted: true,
1878 from: 2,
1879 },
1880 )))
1881 .expect("vote reply");
1882 assert_eq!(node.role(), Role::Leader);
1883 node
1884 }
1885
1886 #[test]
1887 fn test_vote_replies_elect_a_multi_node_leader() {
1888 let node = elect_multi_node_leader();
1889 assert_eq!(node.leader(), Some(1));
1890 }
1891
1892 #[test]
1893 fn test_leader_emits_heartbeats_on_interval() {
1894 let mut node = elect_multi_node_leader();
1895 let first = node.step(Event::Tick).unwrap();
1897 assert!(first.is_empty());
1898 let second = node.step(Event::Tick).unwrap();
1899 let heartbeats = second
1900 .iter()
1901 .filter(|a| {
1902 matches!(
1903 a,
1904 Action::Send {
1905 message: Message::AppendEntries(_),
1906 ..
1907 }
1908 )
1909 })
1910 .count();
1911 assert_eq!(heartbeats, 2);
1912 }
1913
1914 #[test]
1915 fn test_persisted_hard_state_is_restored() {
1916 let mut log = MemoryLog::new();
1917 log.set_hard_state(HardState {
1918 term: 7,
1919 voted_for: Some(3),
1920 })
1921 .unwrap();
1922 let node = RaftNode::with_log(RaftConfig::single(1), log);
1923 assert_eq!(node.term(), 7);
1924 }
1925
1926 #[test]
1927 fn test_vote_is_persisted_to_log() {
1928 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]));
1929 let _ = node
1930 .step(Event::Message(Message::RequestVote(RequestVote {
1931 term: 4,
1932 candidate: 2,
1933 last_log_index: 0,
1934 last_log_term: 0,
1935 force: false,
1936 })))
1937 .unwrap();
1938 assert_eq!(
1939 node.log().hard_state(),
1940 HardState {
1941 term: 4,
1942 voted_for: Some(2)
1943 }
1944 );
1945 }
1946
1947 fn entry(term: Term, index: Index) -> LogEntry {
1950 LogEntry::new(term, index, vec![index as u8])
1951 }
1952
1953 fn first_append_entries(actions: &[Action], to: NodeId) -> AppendEntries {
1954 actions
1955 .iter()
1956 .find_map(|a| match a {
1957 Action::Send {
1958 to: dst,
1959 message: Message::AppendEntries(ae),
1960 } if *dst == to => Some(ae.clone()),
1961 _ => None,
1962 })
1963 .expect("an AppendEntries to the peer")
1964 }
1965
1966 #[test]
1969 fn test_leader_replicates_and_commits_on_quorum() {
1970 let mut node = elect_multi_node_leader();
1971 let _ = node
1973 .step(Event::Message(Message::AppendEntriesReply(
1974 AppendEntriesReply {
1975 term: node.term(),
1976 success: true,
1977 from: 2,
1978 match_index: 0,
1979 conflict_index: 0,
1980 conflict_term: 0,
1981 },
1982 )))
1983 .unwrap();
1984
1985 let actions = node.step(Event::Propose(b"x".to_vec())).unwrap();
1987 assert_eq!(node.commit_index(), 0);
1988 let ae = first_append_entries(&actions, 2);
1989 assert_eq!(ae.entries.len(), 1);
1990 assert_eq!(ae.entries[0].index, 1);
1991
1992 let applied = node
1994 .step(Event::Message(Message::AppendEntriesReply(
1995 AppendEntriesReply {
1996 term: node.term(),
1997 success: true,
1998 from: 2,
1999 match_index: 1,
2000 conflict_index: 0,
2001 conflict_term: 0,
2002 },
2003 )))
2004 .unwrap();
2005 assert_eq!(node.commit_index(), 1);
2006 assert!(
2007 applied
2008 .iter()
2009 .any(|a| matches!(a, Action::Apply { index: 1, .. }))
2010 );
2011 }
2012
2013 #[test]
2014 fn test_follower_appends_streamed_entries() {
2015 let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2016 let actions = node
2017 .step(Event::Message(Message::AppendEntries(AppendEntries {
2018 term: 2,
2019 leader: 1,
2020 prev_log_index: 0,
2021 prev_log_term: 0,
2022 entries: vec![entry(2, 1), entry(2, 2)],
2023 leader_commit: 2,
2024 })))
2025 .unwrap();
2026 assert_eq!(node.log().last_index(), 2);
2027 assert_eq!(node.commit_index(), 2);
2028 let reply = actions
2029 .iter()
2030 .find_map(|a| match a {
2031 Action::Send {
2032 message: Message::AppendEntriesReply(r),
2033 ..
2034 } => Some(r.clone()),
2035 _ => None,
2036 })
2037 .expect("a reply");
2038 assert!(reply.success);
2039 assert_eq!(reply.match_index, 2);
2040 }
2041
2042 #[test]
2043 fn test_follower_truncates_divergent_tail() {
2044 let mut log = MemoryLog::new();
2046 log.append(&[entry(1, 1), entry(2, 2)]).unwrap();
2047 let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
2048
2049 let actions = node
2050 .step(Event::Message(Message::AppendEntries(AppendEntries {
2051 term: 3,
2052 leader: 1,
2053 prev_log_index: 1,
2054 prev_log_term: 1,
2055 entries: vec![entry(3, 2)],
2056 leader_commit: 0,
2057 })))
2058 .unwrap();
2059 assert_eq!(node.log().last_index(), 2);
2060 assert_eq!(node.log().entry(2).unwrap().term, 3);
2061 let reply = first_reply(&actions);
2062 assert!(reply.success);
2063 assert_eq!(reply.match_index, 2);
2064 }
2065
2066 #[test]
2067 fn test_follower_rejects_short_log_with_length_hint() {
2068 let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2069 let actions = node
2070 .step(Event::Message(Message::AppendEntries(AppendEntries {
2071 term: 2,
2072 leader: 1,
2073 prev_log_index: 3,
2074 prev_log_term: 1,
2075 entries: vec![entry(2, 4)],
2076 leader_commit: 0,
2077 })))
2078 .unwrap();
2079 let reply = first_reply(&actions);
2080 assert!(!reply.success);
2081 assert_eq!(reply.conflict_index, 1); assert_eq!(reply.conflict_term, 0);
2083 }
2084
2085 #[test]
2086 fn test_follower_rejects_term_mismatch_with_term_hint() {
2087 let mut log = MemoryLog::new();
2089 log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2090 .unwrap();
2091 let mut node = RaftNode::with_log(RaftConfig::new(5, [1]), log);
2092
2093 let actions = node
2094 .step(Event::Message(Message::AppendEntries(AppendEntries {
2095 term: 5,
2096 leader: 1,
2097 prev_log_index: 3,
2098 prev_log_term: 4, entries: Vec::new(),
2100 leader_commit: 0,
2101 })))
2102 .unwrap();
2103 let reply = first_reply(&actions);
2104 assert!(!reply.success);
2105 assert_eq!(reply.conflict_term, 1);
2106 assert_eq!(reply.conflict_index, 1); }
2108
2109 #[test]
2110 fn test_rejection_backtracks_then_converges() {
2111 let mut log = MemoryLog::new();
2113 log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2114 .unwrap();
2115 log.set_hard_state(HardState {
2116 term: 1,
2117 voted_for: Some(1),
2118 })
2119 .unwrap();
2120 let mut leader =
2121 RaftNode::with_log(RaftConfig::new(1, [2]).with_election_timeout(5, 5), log);
2122 let mut follower = RaftNode::new(RaftConfig::new(2, [1]));
2123
2124 let mut pending = Vec::new();
2127 for _ in 0..50 {
2128 let acts = leader.step(Event::Tick).unwrap();
2129 if !acts.is_empty() {
2130 pending = acts;
2131 break;
2132 }
2133 }
2134 let _ = leader
2135 .step(Event::Message(Message::PreVoteReply(PreVoteReply {
2136 term: leader.term(),
2137 vote_granted: true,
2138 from: 2,
2139 })))
2140 .unwrap();
2141 let _ = leader
2143 .step(Event::Message(Message::RequestVoteReply(
2144 RequestVoteReply {
2145 term: leader.term(),
2146 vote_granted: true,
2147 from: 2,
2148 },
2149 )))
2150 .unwrap();
2151 assert!(leader.is_leader());
2152 let _ = pending;
2153
2154 let mut queue: Vec<(NodeId, Message)> = drain_sends(&mut leader);
2156 for _ in 0..100 {
2157 if follower.log().last_index() == 3 {
2158 break;
2159 }
2160 let mut next = Vec::new();
2161 for (to, msg) in queue.drain(..) {
2162 let acts = if to == 2 {
2163 follower.step(Event::Message(msg)).unwrap()
2164 } else {
2165 leader.step(Event::Message(msg)).unwrap()
2166 };
2167 next.extend(collect_sends(acts));
2168 }
2169 queue = next;
2170 if queue.is_empty() {
2171 queue = leader
2172 .step(Event::Tick)
2173 .unwrap()
2174 .into_iter()
2175 .filter_map(send_pair)
2176 .collect();
2177 }
2178 }
2179 assert_eq!(follower.log().last_index(), 3);
2180 assert_eq!(follower.log().entry(3).unwrap().term, 1);
2181 }
2182
2183 fn first_reply(actions: &[Action]) -> AppendEntriesReply {
2184 actions
2185 .iter()
2186 .find_map(|a| match a {
2187 Action::Send {
2188 message: Message::AppendEntriesReply(r),
2189 ..
2190 } => Some(r.clone()),
2191 _ => None,
2192 })
2193 .expect("an AppendEntriesReply")
2194 }
2195
2196 fn send_pair(a: Action) -> Option<(NodeId, Message)> {
2197 match a {
2198 Action::Send { to, message } => Some((to, message)),
2199 _ => None,
2200 }
2201 }
2202
2203 fn collect_sends(actions: Vec<Action>) -> Vec<(NodeId, Message)> {
2204 actions.into_iter().filter_map(send_pair).collect()
2205 }
2206
2207 fn drain_sends(node: &mut RaftNode) -> Vec<(NodeId, Message)> {
2208 let acts = node.step(Event::Tick).unwrap();
2209 collect_sends(acts)
2210 }
2211
2212 #[derive(Default)]
2217 struct SyncCountLog {
2218 inner: MemoryLog,
2219 syncs: std::cell::Cell<u32>,
2220 }
2221
2222 impl SyncCountLog {
2223 fn syncs(&self) -> u32 {
2224 self.syncs.get()
2225 }
2226 }
2227
2228 impl RaftLog for SyncCountLog {
2229 fn last_index(&self) -> Index {
2230 self.inner.last_index()
2231 }
2232 fn last_term(&self) -> Term {
2233 self.inner.last_term()
2234 }
2235 fn term_at(&self, index: Index) -> Option<Term> {
2236 self.inner.term_at(index)
2237 }
2238 fn entry(&self, index: Index) -> Option<LogEntry> {
2239 self.inner.entry(index)
2240 }
2241 fn append(&mut self, entries: &[LogEntry]) -> Result<()> {
2242 self.inner.append(entries)
2243 }
2244 fn truncate(&mut self, from: Index) -> Result<()> {
2245 self.inner.truncate(from)
2246 }
2247 fn hard_state(&self) -> HardState {
2248 self.inner.hard_state()
2249 }
2250 fn set_hard_state(&mut self, state: HardState) -> Result<()> {
2251 self.inner.set_hard_state(state)
2252 }
2253 fn sync(&mut self) -> Result<()> {
2254 self.syncs.set(self.syncs.get() + 1);
2255 self.inner.sync()
2256 }
2257 }
2258
2259 #[test]
2260 fn test_granting_a_vote_persists_and_syncs_before_replying() {
2261 let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), SyncCountLog::default());
2262 let actions = node
2263 .step(Event::Message(Message::RequestVote(RequestVote {
2264 term: 4,
2265 candidate: 2,
2266 last_log_index: 0,
2267 last_log_term: 0,
2268 force: false,
2269 })))
2270 .unwrap();
2271 assert!(actions.iter().any(|a| matches!(
2273 a,
2274 Action::Send { message: Message::RequestVoteReply(r), .. } if r.vote_granted
2275 )));
2276 assert!(
2278 node.log().syncs() >= 1,
2279 "vote must be synced before the reply"
2280 );
2281 assert_eq!(node.log().hard_state().voted_for, Some(2));
2282 }
2283
2284 #[test]
2287 fn test_snapshot_hint_then_compaction() {
2288 let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(2));
2290 drive_to_leader(&mut node);
2291
2292 let mut hint = None;
2293 for _ in 0..4 {
2294 let actions = node.step(Event::Propose(b"c".to_vec())).unwrap();
2295 if let Some(Action::Snapshot { index, term }) = actions
2296 .iter()
2297 .find(|a| matches!(a, Action::Snapshot { .. }))
2298 .cloned()
2299 {
2300 hint = Some((index, term));
2301 break;
2302 }
2303 }
2304 let (index, _term) = hint.expect("a snapshot hint once the log grew");
2305 assert!(index >= 2);
2306
2307 let _ = node
2309 .step(Event::Snapshot {
2310 index,
2311 data: b"state".to_vec(),
2312 })
2313 .unwrap();
2314 assert_eq!(node.log().snapshot_index(), index);
2315 assert_eq!(node.log().entry(1), None); assert_eq!(node.commit_index(), node.commit_index()); }
2318
2319 #[test]
2320 fn test_snapshot_event_rejects_uncommitted_index() {
2321 let mut node = RaftNode::new(RaftConfig::single(1).with_snapshot_threshold(0));
2322 drive_to_leader(&mut node);
2323 let _ = node.step(Event::Propose(b"c".to_vec())).unwrap(); let _ = node
2326 .step(Event::Snapshot {
2327 index: 99,
2328 data: vec![],
2329 })
2330 .unwrap();
2331 assert_eq!(node.log().snapshot_index(), 0);
2332 }
2333
2334 #[test]
2335 fn test_leader_sends_install_snapshot_when_follower_is_behind() {
2336 let mut log = MemoryLog::new();
2339 log.append(&[entry(1, 1), entry(1, 2), entry(1, 3)])
2340 .unwrap();
2341 log.apply_snapshot(&Snapshot::new(2, 1, b"snap".to_vec()))
2342 .unwrap();
2343 log.set_hard_state(HardState {
2344 term: 1,
2345 voted_for: Some(1),
2346 })
2347 .unwrap();
2348 let mut node =
2349 RaftNode::with_log(RaftConfig::new(1, [2, 3]).with_election_timeout(5, 5), log);
2350 let mut elected = false;
2353 for _ in 0..50 {
2354 let _ = node.step(Event::Tick).unwrap();
2355 let _ = node
2356 .step(Event::Message(Message::PreVoteReply(PreVoteReply {
2357 term: node.term(),
2358 vote_granted: true,
2359 from: 2,
2360 })))
2361 .unwrap();
2362 if node.role() == Role::Candidate {
2363 let _ = node
2364 .step(Event::Message(Message::RequestVoteReply(
2365 RequestVoteReply {
2366 term: node.term(),
2367 vote_granted: true,
2368 from: 2,
2369 },
2370 )))
2371 .unwrap();
2372 }
2373 if node.is_leader() {
2374 elected = true;
2375 break;
2376 }
2377 }
2378 assert!(elected);
2379 let actions = node
2382 .step(Event::Message(Message::AppendEntriesReply(
2383 AppendEntriesReply {
2384 term: node.term(),
2385 success: false,
2386 from: 2,
2387 match_index: 0,
2388 conflict_index: 1, conflict_term: 0,
2390 },
2391 )))
2392 .unwrap();
2393 assert!(actions.iter().any(|a| matches!(
2395 a,
2396 Action::Send {
2397 to: 2,
2398 message: Message::InstallSnapshot(_)
2399 }
2400 )));
2401 }
2402
2403 #[test]
2404 fn test_follower_installs_snapshot_and_restores() {
2405 let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2406 let actions = node
2407 .step(Event::Message(Message::InstallSnapshot(InstallSnapshot {
2408 term: 3,
2409 leader: 1,
2410 snapshot: Snapshot::new(8, 2, b"the state".to_vec()),
2411 })))
2412 .unwrap();
2413 assert_eq!(node.log().snapshot_index(), 8);
2414 assert_eq!(node.commit_index(), 8);
2415 assert!(
2417 actions
2418 .iter()
2419 .any(|a| matches!(a, Action::RestoreSnapshot { index: 8, .. }))
2420 );
2421 assert!(actions.iter().any(|a| matches!(
2422 a,
2423 Action::Send { message: Message::InstallSnapshotReply(r), .. } if r.last_index == 8
2424 )));
2425 }
2426
2427 #[test]
2428 fn test_node_recovers_applied_position_from_snapshot() {
2429 let mut log = MemoryLog::new();
2432 log.apply_snapshot(&Snapshot::new(6, 2, b"s".to_vec()))
2433 .unwrap();
2434 let node = RaftNode::with_log(RaftConfig::single(1), log);
2435 assert_eq!(node.commit_index(), 6);
2436 assert_eq!(node.last_applied(), 6);
2437 }
2438
2439 #[test]
2440 fn test_rejected_vote_makes_no_durable_write() {
2441 let mut log = SyncCountLog::default();
2444 log.set_hard_state(HardState {
2445 term: 5,
2446 voted_for: Some(9),
2447 })
2448 .unwrap();
2449 let mut node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
2450 let before = node.log().syncs();
2451 let _ = node
2452 .step(Event::Message(Message::RequestVote(RequestVote {
2453 term: 3, candidate: 2,
2455 last_log_index: 0,
2456 last_log_term: 0,
2457 force: false,
2458 })))
2459 .unwrap();
2460 assert_eq!(node.log().syncs(), before, "a no-op vote must not sync");
2461 }
2462
2463 fn membership_changed(actions: &[Action]) -> Option<Vec<NodeId>> {
2466 actions.iter().find_map(|a| match a {
2467 Action::MembershipChanged { members } => Some(members.clone()),
2468 _ => None,
2469 })
2470 }
2471
2472 #[test]
2473 fn test_node_reports_bootstrap_membership() {
2474 let node = RaftNode::new(RaftConfig::new(1, [3, 2]));
2475 assert_eq!(node.members(), &[1, 2, 3]); }
2477
2478 #[test]
2479 fn test_add_server_adopts_config_immediately() {
2480 let mut node = RaftNode::new(RaftConfig::single(1));
2481 drive_to_leader(&mut node);
2482 let actions = node.step(Event::AddServer(2)).unwrap();
2483 assert_eq!(node.members(), &[1, 2]);
2484 assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
2485 let last = node.log().last_index();
2487 assert_eq!(node.log().entry(last).unwrap().members(), Some(vec![1, 2]));
2488 }
2489
2490 #[test]
2491 fn test_remove_server_adopts_config() {
2492 let mut node = elect_multi_node_leader(); let actions = node.step(Event::RemoveServer(3)).unwrap();
2494 assert_eq!(node.members(), &[1, 2]);
2495 assert_eq!(membership_changed(&actions), Some(vec![1, 2]));
2496 }
2497
2498 #[test]
2499 fn test_add_existing_member_is_noop() {
2500 let mut node = elect_multi_node_leader();
2501 let actions = node.step(Event::AddServer(2)).unwrap();
2502 assert!(actions.is_empty());
2503 assert_eq!(node.members(), &[1, 2, 3]);
2504 }
2505
2506 #[test]
2507 fn test_one_config_change_at_a_time() {
2508 let mut node = RaftNode::new(RaftConfig::single(1));
2511 drive_to_leader(&mut node);
2512 let _ = node.step(Event::AddServer(2)).unwrap();
2513 let err = node.step(Event::AddServer(3)).unwrap_err();
2514 assert!(matches!(err, Error::ConfigInProgress));
2515 }
2516
2517 #[test]
2518 fn test_change_membership_rejected_on_follower() {
2519 let mut node = RaftNode::new(RaftConfig::new(2, [1, 3]));
2520 let err = node.step(Event::AddServer(4)).unwrap_err();
2521 assert!(matches!(err, Error::NotLeader { .. }));
2522 }
2523
2524 #[test]
2525 fn test_membership_recovered_from_config_entry() {
2526 let mut log = MemoryLog::new();
2528 log.append(&[
2529 LogEntry::new(1, 1, b"x".to_vec()),
2530 LogEntry::config(1, 2, &[1, 2, 3, 4]),
2531 ])
2532 .unwrap();
2533 let node = RaftNode::with_log(RaftConfig::new(1, [2, 3]), log);
2534 assert_eq!(node.members(), &[1, 2, 3, 4]);
2535 }
2536
2537 #[test]
2538 fn test_membership_recovered_from_snapshot_config() {
2539 let mut log = MemoryLog::new();
2540 log.apply_snapshot(&Snapshot::with_config(
2541 5,
2542 2,
2543 vec![1, 2, 3, 4, 5],
2544 b"s".to_vec(),
2545 ))
2546 .unwrap();
2547 let node = RaftNode::with_log(RaftConfig::single(1), log);
2548 assert_eq!(node.members(), &[1, 2, 3, 4, 5]);
2549 }
2550
2551 #[test]
2552 fn test_follower_adopts_config_from_append() {
2553 let mut node = RaftNode::new(RaftConfig::new(5, [1]));
2554 let actions = node
2555 .step(Event::Message(Message::AppendEntries(AppendEntries {
2556 term: 2,
2557 leader: 1,
2558 prev_log_index: 0,
2559 prev_log_term: 0,
2560 entries: vec![LogEntry::config(2, 1, &[1, 5, 9])],
2561 leader_commit: 0,
2562 })))
2563 .unwrap();
2564 assert_eq!(node.members(), &[1, 5, 9]);
2565 assert_eq!(membership_changed(&actions), Some(vec![1, 5, 9]));
2566 }
2567
2568 #[test]
2571 fn test_timeout_now_triggers_immediate_election() {
2572 let mut node = RaftNode::new(RaftConfig::new(1, [2, 3]).with_election_timeout(1000, 1000));
2573 let actions = node
2575 .step(Event::Message(Message::TimeoutNow(TimeoutNow {
2576 term: 0,
2577 leader: 2,
2578 })))
2579 .unwrap();
2580 assert_eq!(node.role(), Role::Candidate);
2581 assert!(actions.iter().any(|a| matches!(
2582 a,
2583 Action::Send {
2584 message: Message::RequestVote(_),
2585 ..
2586 }
2587 )));
2588 }
2589
2590 #[test]
2591 fn test_transfer_to_caught_up_follower_sends_timeout_now() {
2592 let mut node = elect_multi_node_leader(); let _ = node
2595 .step(Event::Message(Message::AppendEntriesReply(
2596 AppendEntriesReply {
2597 term: node.term(),
2598 success: true,
2599 from: 2,
2600 match_index: 0,
2601 conflict_index: 0,
2602 conflict_term: 0,
2603 },
2604 )))
2605 .unwrap();
2606 let actions = node.step(Event::TransferLeadership(2)).unwrap();
2607 assert!(actions.iter().any(|a| matches!(
2608 a,
2609 Action::Send {
2610 to: 2,
2611 message: Message::TimeoutNow(_)
2612 }
2613 )));
2614 }
2615
2616 #[test]
2617 fn test_transfer_to_non_voter_is_noop() {
2618 let mut node = elect_multi_node_leader();
2619 let actions = node.step(Event::TransferLeadership(99)).unwrap();
2620 assert!(actions.is_empty());
2621 }
2622
2623 #[test]
2624 fn test_non_voter_does_not_start_election() {
2625 let mut log = MemoryLog::new();
2627 log.append(&[LogEntry::config(1, 1, &[1, 2, 3])]).unwrap(); let mut node = RaftNode::with_log(
2629 RaftConfig::new(5, [1, 2, 3]).with_election_timeout(2, 2),
2630 log,
2631 );
2632 assert_eq!(node.members(), &[1, 2, 3]);
2633 for _ in 0..50 {
2634 let _ = node.step(Event::Tick).unwrap();
2635 }
2636 assert_eq!(node.role(), Role::Follower);
2637 }
2638}