use amaters_cluster::{
AppendEntriesRequest, AppendEntriesResponse, Command, LogEntry, NodeState, RaftConfig,
RaftNode, RequestVoteRequest, RequestVoteResponse,
};
fn create_three_node_cluster() -> (RaftNode, RaftNode, RaftNode) {
let peers = vec![1, 2, 3];
let n1 = RaftNode::new(RaftConfig::new(1, peers.clone())).expect("node 1 creation failed");
let n2 = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("node 2 creation failed");
let n3 = RaftNode::new(RaftConfig::new(3, peers)).expect("node 3 creation failed");
(n1, n2, n3)
}
fn elect_leader(leader: &RaftNode, voters: &[&RaftNode]) {
let vote_requests = leader.start_election();
assert!(
!vote_requests.is_empty(),
"start_election should produce vote requests"
);
for voter in voters {
let req = RequestVoteRequest::new(
leader.current_term(),
leader.node_id(),
leader.last_log_index(),
0, );
let resp = voter.handle_request_vote(req);
if resp.vote_granted {
let became_leader = leader.handle_vote_response(voter.node_id(), resp);
if became_leader {
break;
}
}
}
}
#[test]
fn test_three_node_election_produces_exactly_one_leader() {
let (n1, n2, n3) = create_three_node_cluster();
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n2.state(), NodeState::Follower);
assert_eq!(n3.state(), NodeState::Follower);
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n1.current_term(), 1);
assert_eq!(n2.state(), NodeState::Follower);
assert_eq!(n3.state(), NodeState::Follower);
}
#[test]
fn test_election_requires_quorum() {
let (n1, _n2, _n3) = create_three_node_cluster();
let _vote_requests = n1.start_election();
assert_eq!(n1.state(), NodeState::Candidate);
let rejected = RequestVoteResponse::rejected(1);
let became_leader = n1.handle_vote_response(2, rejected);
assert!(!became_leader);
assert_eq!(n1.state(), NodeState::Candidate);
}
#[test]
fn test_election_with_five_nodes() {
let peers = vec![1, 2, 3, 4, 5];
let n1 = RaftNode::new(RaftConfig::new(1, peers.clone())).expect("n1");
let n2 = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("n2");
let n3 = RaftNode::new(RaftConfig::new(3, peers.clone())).expect("n3");
let n4 = RaftNode::new(RaftConfig::new(4, peers.clone())).expect("n4");
let _n5 = RaftNode::new(RaftConfig::new(5, peers)).expect("n5");
let _vote_requests = n1.start_election();
let req = RequestVoteRequest::new(n1.current_term(), n1.node_id(), 0, 0);
let resp2 = n2.handle_request_vote(req.clone());
assert!(resp2.vote_granted);
let became_leader = n1.handle_vote_response(2, resp2);
assert!(!became_leader);
let req = RequestVoteRequest::new(n1.current_term(), n1.node_id(), 0, 0);
let resp3 = n3.handle_request_vote(req);
assert!(resp3.vote_granted);
let became_leader = n1.handle_vote_response(3, resp3);
assert!(became_leader);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n4.state(), NodeState::Follower);
}
#[test]
fn test_log_replication_via_append_entries() {
let (n1, n2, _n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &_n3]);
assert_eq!(n1.state(), NodeState::Leader);
let idx1 = n1
.propose(Command::from_str("SET x 1"))
.expect("propose 1 failed");
let idx2 = n1
.propose(Command::from_str("SET y 2"))
.expect("propose 2 failed");
assert_eq!(idx1, 1);
assert_eq!(idx2, 2);
let repl_requests = n1.create_replication_requests();
assert!(
!repl_requests.is_empty(),
"leader should create replication requests"
);
let (_, req_for_n2) = repl_requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("should have request for node 2");
let resp = n2.handle_append_entries(req_for_n2.clone());
assert!(resp.success, "follower should accept valid entries");
assert_eq!(resp.last_log_index, 2);
assert_eq!(n2.last_log_index(), n1.last_log_index());
}
#[test]
fn test_heartbeat_does_not_change_log() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
let heartbeats = n1.create_heartbeats();
assert!(!heartbeats.is_empty(), "leader should send heartbeats");
for (peer_id, hb) in &heartbeats {
assert!(hb.is_heartbeat(), "heartbeat entries must be empty");
let target = if *peer_id == 2 { &n2 } else { &n3 };
let resp = target.handle_append_entries(hb.clone());
assert!(resp.success, "heartbeat should be accepted by follower");
}
assert_eq!(n1.last_log_index(), 0);
assert_eq!(n2.last_log_index(), 0);
assert_eq!(n3.last_log_index(), 0);
}
#[test]
fn test_propose_as_follower_fails() {
let (n1, _n2, _n3) = create_three_node_cluster();
assert_eq!(n1.state(), NodeState::Follower);
let result = n1.propose(Command::from_str("SET x 1"));
assert!(result.is_err(), "follower should reject proposals");
}
#[test]
fn test_term_advancement_on_higher_term_vote_request() {
let (n1, n2, _n3) = create_three_node_cluster();
assert_eq!(n1.current_term(), 0);
n2.start_election();
assert_eq!(n2.current_term(), 1);
let req = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n1.handle_request_vote(req);
assert!(resp.vote_granted);
assert_eq!(n1.current_term(), 1);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_leader_steps_down_on_higher_term() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n1.current_term(), 1);
let higher_term_req = AppendEntriesRequest::heartbeat(2, 2, 0, 0, 0);
let resp = n1.handle_append_entries(higher_term_req);
assert!(resp.success);
assert_eq!(n1.current_term(), 2);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_candidate_steps_down_on_higher_term_vote_response() {
let (n1, _n2, _n3) = create_three_node_cluster();
n1.start_election();
assert_eq!(n1.state(), NodeState::Candidate);
assert_eq!(n1.current_term(), 1);
let resp = RequestVoteResponse::rejected(5);
let became_leader = n1.handle_vote_response(2, resp);
assert!(!became_leader);
assert_eq!(n1.current_term(), 5);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_stale_vote_request_rejected() {
let (n1, n2, _n3) = create_three_node_cluster();
n1.start_election(); n1.start_election(); n1.start_election(); assert_eq!(n1.current_term(), 3);
let stale_req = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n1.handle_request_vote(stale_req);
assert!(!resp.vote_granted, "stale term vote should be rejected");
assert_eq!(resp.term, 3, "response should contain current term");
}
#[test]
fn test_stale_append_entries_rejected() {
let (n1, _n2, _n3) = create_three_node_cluster();
n1.start_election(); n1.start_election();
let stale_req = AppendEntriesRequest::heartbeat(1, 2, 0, 0, 0);
let resp = n1.handle_append_entries(stale_req);
assert!(!resp.success, "stale term AppendEntries should be rejected");
assert_eq!(resp.term, 2);
}
#[test]
fn test_replication_response_updates_leader_state() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
n1.propose(Command::from_str("SET a 1"))
.expect("propose failed");
let repl = n1.create_replication_requests();
let (_, req_for_n2) = repl.iter().find(|(p, _)| *p == 2).expect("request for n2");
let resp = n2.handle_append_entries(req_for_n2.clone());
assert!(resp.success);
n1.handle_replication_response(2, resp)
.expect("handle response failed");
assert_eq!(n1.commit_index(), 1);
}
#[test]
fn test_leader_steps_down_on_higher_term_replication_response() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
let resp = AppendEntriesResponse::new(10, false, 0, None, None);
n1.handle_replication_response(2, resp)
.expect("handle response failed");
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n1.current_term(), 10);
}
#[test]
fn test_successive_elections_increment_term() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.current_term(), 1);
assert_eq!(n1.state(), NodeState::Leader);
let _vote_requests = n2.start_election();
assert_eq!(n2.current_term(), 2);
assert_eq!(n2.state(), NodeState::Candidate);
let req = RequestVoteRequest::new(2, 2, 0, 0);
let resp = n3.handle_request_vote(req);
assert!(resp.vote_granted);
let became_leader = n2.handle_vote_response(3, resp);
assert!(became_leader);
assert_eq!(n2.state(), NodeState::Leader);
assert_eq!(n2.current_term(), 2);
let hb = AppendEntriesRequest::heartbeat(2, 2, 0, 0, 0);
let resp = n1.handle_append_entries(hb);
assert!(resp.success);
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n1.current_term(), 2);
}
#[test]
fn test_duplicate_vote_for_same_candidate() {
let (n1, n2, _n3) = create_three_node_cluster();
n1.start_election();
let req = RequestVoteRequest::new(1, 1, 0, 0);
let resp1 = n2.handle_request_vote(req.clone());
assert!(resp1.vote_granted);
let resp2 = n2.handle_request_vote(req);
assert!(resp2.vote_granted);
}
#[test]
fn test_vote_rejected_when_already_voted_for_different_candidate() {
let (n1, n2, n3) = create_three_node_cluster();
n1.start_election();
let req_from_n1 = RequestVoteRequest::new(1, 1, 0, 0);
let resp = n3.handle_request_vote(req_from_n1);
assert!(resp.vote_granted);
let req_from_n2 = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n3.handle_request_vote(req_from_n2);
assert!(
!resp.vote_granted,
"should reject vote for different candidate in same term"
);
}