use super::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
#[test]
fn watermark_uncovered_candidate_is_refused() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
let req = VoteRequest::real("cand", 5, 90);
let decision = voter.consider(&req, 100).unwrap();
assert_eq!(
decision,
VoteDecision::Refused(RefusalReason::WatermarkNotCovered {
candidate_lsn: 90,
watermark: 100,
}),
);
}
#[test]
fn watermark_covered_candidate_is_granted() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
let req = VoteRequest::real("cand", 5, 120);
assert_eq!(voter.consider(&req, 100).unwrap(), VoteDecision::Granted);
}
#[test]
fn watermark_rule_beats_a_newer_term() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
let req = VoteRequest::real("cand", 9999, 10);
assert!(matches!(
voter.consider(&req, 100).unwrap(),
VoteDecision::Refused(RefusalReason::WatermarkNotCovered { .. })
));
}
#[test]
fn cannot_double_vote_for_two_candidates_in_same_term() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
assert_eq!(
voter
.consider(&VoteRequest::real("a", 7, 200), 100)
.unwrap(),
VoteDecision::Granted
);
assert_eq!(
voter
.consider(&VoteRequest::real("b", 7, 200), 100)
.unwrap(),
VoteDecision::Refused(RefusalReason::AlreadyVoted {
term: 7,
voted_for: "a".to_string(),
}),
);
}
#[test]
fn re_ask_by_same_candidate_in_same_term_is_idempotently_granted() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
assert!(voter
.consider(&VoteRequest::real("a", 7, 200), 100)
.unwrap()
.is_granted());
assert!(voter
.consider(&VoteRequest::real("a", 7, 200), 100)
.unwrap()
.is_granted());
}
#[test]
fn newer_term_clears_the_previous_terms_vote() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
assert!(voter
.consider(&VoteRequest::real("a", 7, 200), 100)
.unwrap()
.is_granted());
assert!(voter
.consider(&VoteRequest::real("b", 8, 200), 100)
.unwrap()
.is_granted());
assert_eq!(voter.current_term().unwrap(), 8);
}
#[test]
fn stale_term_candidate_is_refused() {
let voter = Voter::new(
"v",
MemoryLastVoteStore::seeded(LastVote {
term: 10,
voted_for: Some("x".to_string()),
}),
);
let decision = voter
.consider(&VoteRequest::real("old", 6, 200), 100)
.unwrap();
assert_eq!(
decision,
VoteDecision::Refused(RefusalReason::StaleTerm {
candidate_term: 6,
voter_term: 10,
}),
);
}
#[test]
fn dry_run_probe_does_not_persist_or_advance_term() {
let voter = Voter::new("v", MemoryLastVoteStore::new());
assert!(voter
.consider(&VoteRequest::probe("a", 7, 200), 100)
.unwrap()
.is_granted());
assert_eq!(voter.current_term().unwrap(), 0);
assert!(voter
.consider(&VoteRequest::real("b", 7, 200), 100)
.unwrap()
.is_granted());
}
#[test]
fn file_store_round_trips_and_defaults_to_empty() {
let dir = std::env::temp_dir().join(format!(
"reddb-lastvote-{}-{}",
std::process::id(),
crate::utils::now_unix_nanos()
));
let path = dir.join("node.lastvote.json");
let store = FileLastVoteStore::new(&path);
assert_eq!(store.load().unwrap(), LastVote::default());
store
.persist(&LastVote {
term: 12,
voted_for: Some("cand-7".to_string()),
})
.unwrap();
let reopened = FileLastVoteStore::new(&path);
assert_eq!(
reopened.load().unwrap(),
LastVote {
term: 12,
voted_for: Some("cand-7".to_string()),
}
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn restarted_voter_does_not_double_vote_in_a_term() {
let dir = std::env::temp_dir().join(format!(
"reddb-lastvote-restart-{}-{}",
std::process::id(),
crate::utils::now_unix_nanos()
));
let path = dir.join("node.lastvote.json");
{
let voter = Voter::new("v", FileLastVoteStore::new(&path));
assert!(voter
.consider(&VoteRequest::real("a", 7, 200), 100)
.unwrap()
.is_granted());
}
{
let voter = Voter::new("v", FileLastVoteStore::new(&path));
assert_eq!(
voter
.consider(&VoteRequest::real("b", 7, 200), 100)
.unwrap(),
VoteDecision::Refused(RefusalReason::AlreadyVoted {
term: 7,
voted_for: "a".to_string(),
}),
);
}
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn quorum_threshold_is_strict_majority_of_voting_members() {
let three = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
];
assert_eq!(quorum_threshold(&three), 2);
let two_plus_witness = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::witness("w"),
];
assert_eq!(quorum_threshold(&two_plus_witness), 2);
let with_catch_up = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
Member::data_catching_up("d"),
];
assert_eq!(quorum_threshold(&with_catch_up), 2, "3 voters, not 4");
}
#[test]
fn electability_excludes_witness_and_catching_up() {
assert!(Member::data_voting("a").is_electable());
assert!(
!Member::witness("w").is_electable(),
"witness holds no data"
);
assert!(
!Member::data_catching_up("d").is_electable(),
"catch-up replica is not healthy"
);
}
#[test]
fn randomized_timeout_stays_within_band() {
let base = Duration::from_millis(150);
let jitter = Duration::from_millis(150);
for seed in 0..1000u64 {
let t = randomized_election_timeout(base, jitter, seed);
assert!(t >= base, "never below base");
assert!(t < base + jitter, "never at or beyond base+jitter");
}
assert_ne!(
randomized_election_timeout(base, jitter, 10),
randomized_election_timeout(base, jitter, 11),
);
assert_eq!(randomized_election_timeout(base, Duration::ZERO, 999), base);
}
struct ClusterTransport {
members: Vec<Member>,
peers: HashMap<String, PeerState>,
elapsed_steps: Rc<RefCell<Duration>>,
tick: Duration,
bumped_term: Option<u64>,
promoted_term: Option<u64>,
}
struct PeerState {
store: Rc<MemoryLastVoteStore>,
watermark: u64,
reachable: bool,
}
impl ClusterTransport {
fn new(members: Vec<Member>) -> Self {
let mut peers = HashMap::new();
for m in &members {
peers.insert(
m.id.clone(),
PeerState {
store: Rc::new(MemoryLastVoteStore::new()),
watermark: 0,
reachable: true,
},
);
}
Self {
members,
peers,
elapsed_steps: Rc::new(RefCell::new(Duration::ZERO)),
tick: Duration::from_millis(10),
bumped_term: None,
promoted_term: None,
}
}
fn with_watermark(mut self, watermark: u64) -> Self {
for p in self.peers.values_mut() {
p.watermark = watermark;
}
self
}
fn partition_away(&mut self, peer_id: &str) {
if let Some(p) = self.peers.get_mut(peer_id) {
p.reachable = false;
}
}
fn share_store(&mut self, peer_id: &str, store: Rc<MemoryLastVoteStore>) {
if let Some(p) = self.peers.get_mut(peer_id) {
p.store = store;
}
}
}
impl ElectionTransport for ClusterTransport {
fn members(&self) -> Vec<Member> {
self.members.clone()
}
fn request_vote(&mut self, peer_id: &str, req: &VoteRequest) -> VoteDecision {
let p = self.peers.get(peer_id).expect("known peer");
if !p.reachable {
return VoteDecision::Refused(RefusalReason::StaleTerm {
candidate_term: req.term,
voter_term: u64::MAX,
});
}
let voter = VoterRef {
store: Rc::clone(&p.store),
};
voter.consider(req, p.watermark)
}
fn elapsed(&self) -> Duration {
let mut e = self.elapsed_steps.borrow_mut();
let now = *e;
*e += self.tick;
now
}
fn bump_term(&mut self, new_term: u64) {
self.bumped_term = Some(new_term);
}
fn promote(&mut self, new_term: u64) {
self.promoted_term = Some(new_term);
}
}
struct VoterRef {
store: Rc<MemoryLastVoteStore>,
}
impl VoterRef {
fn consider(&self, req: &VoteRequest, watermark: u64) -> VoteDecision {
let voter = Voter::new("peer", RcStore(Rc::clone(&self.store)));
voter
.consider(req, watermark)
.expect("memory store infallible")
}
}
struct RcStore(Rc<MemoryLastVoteStore>);
impl LastVoteStore for RcStore {
fn load(&self) -> Result<LastVote, LastVoteError> {
self.0.load()
}
fn persist(&self, vote: &LastVote) -> Result<(), LastVoteError> {
self.0.persist(vote)
}
}
fn candidate_request(id: &str, current_term: u64, lsn: u64, watermark: u64) -> ElectionRequest {
ElectionRequest {
candidate: Member::data_voting(id),
current_term,
last_log_lsn: lsn,
commit_watermark: watermark,
}
}
const LONG: Duration = Duration::from_secs(60);
#[test]
fn covering_candidate_is_elected_by_majority() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::Elected {
term: 5,
votes: 2,
needed: 2,
}
);
assert_eq!(tx.bumped_term, Some(5), "real election bumps the term");
assert_eq!(tx.promoted_term, Some(5), "winner is promoted via handover");
}
#[test]
fn candidate_not_covering_watermark_cannot_win() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
let req = candidate_request("a", 4, 80, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(outcome, ElectionOutcome::NotElectable);
assert_eq!(
tx.bumped_term, None,
"no term bump for an unelectable candidate"
);
assert_eq!(tx.promoted_term, None);
}
#[test]
fn voters_below_watermark_refuse_so_lagging_candidate_loses() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
];
let mut tx = ClusterTransport::new(members).with_watermark(500);
let req = candidate_request("a", 4, 120, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::ProbeFailed {
votes: 1,
needed: 2
}
);
assert_eq!(tx.bumped_term, None);
}
#[test]
fn dry_run_probe_does_not_bump_term_when_it_fails() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
Member::data_voting("d"),
Member::data_voting("e"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
tx.partition_away("c");
tx.partition_away("d");
tx.partition_away("e");
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::ProbeFailed {
votes: 2,
needed: 3
}
);
assert_eq!(
tx.bumped_term, None,
"a failed probe must NOT bump the term"
);
assert_eq!(tx.promoted_term, None);
}
#[test]
fn witness_vote_counts_toward_quorum() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::witness("w"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
tx.partition_away("b");
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::Elected {
term: 5,
votes: 2,
needed: 2,
},
"the witness vote completes the majority",
);
}
#[test]
fn catching_up_replica_neither_votes_nor_enlarges_quorum() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_catching_up("c"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::Elected {
term: 5,
votes: 2,
needed: 2,
}
);
}
#[test]
fn election_times_out_without_a_majority() {
let members = vec![
Member::data_voting("a"),
Member::data_voting("b"),
Member::data_voting("c"),
];
let mut tx = ClusterTransport::new(members).with_watermark(100);
tx.partition_away("b");
tx.partition_away("c");
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, Duration::from_millis(5));
assert!(
matches!(outcome, ElectionOutcome::TimedOut { needed: 2, .. }),
"got {outcome:?}",
);
assert_eq!(tx.bumped_term, None, "timed-out probe does not bump term");
}
#[test]
fn partition_minority_candidate_cannot_win() {
let members: Vec<Member> = ["a", "b", "c", "d", "e"]
.iter()
.map(|id| Member::data_voting(*id))
.collect();
let mut tx = ClusterTransport::new(members).with_watermark(100);
tx.partition_away("c");
tx.partition_away("d");
tx.partition_away("e");
let req = candidate_request("a", 4, 150, 100);
let outcome = ElectionCoordinator::run(&req, &mut tx, LONG);
assert_eq!(
outcome,
ElectionOutcome::ProbeFailed {
votes: 2,
needed: 3
}
);
assert_eq!(tx.promoted_term, None, "minority side promotes no one");
}
#[test]
fn partition_majority_candidate_wins_and_minority_cannot_take_same_term() {
let ids = ["a", "b", "c", "d", "e"];
let members: Vec<Member> = ids.iter().map(|id| Member::data_voting(*id)).collect();
let stores: HashMap<&str, Rc<MemoryLastVoteStore>> = ids
.iter()
.map(|id| (*id, Rc::new(MemoryLastVoteStore::new())))
.collect();
let mut majority = ClusterTransport::new(members.clone()).with_watermark(100);
for id in &ids {
majority.share_store(id, Rc::clone(&stores[id]));
}
majority.partition_away("a");
majority.partition_away("b");
let c_req = candidate_request("c", 4, 150, 100);
let c_outcome = ElectionCoordinator::run(&c_req, &mut majority, LONG);
assert_eq!(
c_outcome,
ElectionOutcome::Elected {
term: 5,
votes: 3,
needed: 3,
},
"majority side elects c for term 5",
);
assert_eq!(majority.promoted_term, Some(5));
let mut minority = ClusterTransport::new(members).with_watermark(100);
for id in &ids {
minority.share_store(id, Rc::clone(&stores[id]));
}
minority.partition_away("c");
minority.partition_away("d");
minority.partition_away("e");
let a_req = candidate_request("a", 4, 150, 100);
let a_outcome = ElectionCoordinator::run(&a_req, &mut minority, LONG);
assert!(
!a_outcome.is_elected(),
"minority candidate must not also win term 5, got {a_outcome:?}",
);
assert_eq!(minority.promoted_term, None, "no second primary for term 5");
let shared_voter = Voter::new("d", RcStore(Rc::clone(&stores["d"])));
assert_eq!(
shared_voter
.consider(&VoteRequest::real("a", 5, 150), 100)
.unwrap(),
VoteDecision::Refused(RefusalReason::AlreadyVoted {
term: 5,
voted_for: "c".to_string(),
}),
"a voter that granted c in term 5 blocks a second primary in term 5",
);
}