use amaters_cluster::{
AppendEntriesRequest, AppendEntriesResponse, Command, LogEntry, NodeState, RaftConfig,
RaftNode, RequestVoteRequest, RequestVoteResponse,
};
fn create_three_node_cluster() -> (RaftNode, RaftNode, RaftNode) {
let peers = vec![1, 2, 3];
let n1 = RaftNode::new(RaftConfig::new(1, peers.clone())).expect("node 1 creation failed");
let n2 = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("node 2 creation failed");
let n3 = RaftNode::new(RaftConfig::new(3, peers)).expect("node 3 creation failed");
(n1, n2, n3)
}
fn elect_leader(leader: &RaftNode, voters: &[&RaftNode]) {
let vote_requests = leader.start_election();
assert!(
!vote_requests.is_empty(),
"start_election should produce vote requests"
);
for voter in voters {
let req = RequestVoteRequest::new(
leader.current_term(),
leader.node_id(),
leader.last_log_index(),
0, );
let resp = voter.handle_request_vote(req);
if resp.vote_granted {
let became_leader = leader.handle_vote_response(voter.node_id(), resp);
if became_leader {
break;
}
}
}
}
#[test]
fn test_three_node_election_produces_exactly_one_leader() {
let (n1, n2, n3) = create_three_node_cluster();
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n2.state(), NodeState::Follower);
assert_eq!(n3.state(), NodeState::Follower);
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n1.current_term(), 1);
assert_eq!(n2.state(), NodeState::Follower);
assert_eq!(n3.state(), NodeState::Follower);
}
#[test]
fn test_election_requires_quorum() {
let (n1, _n2, _n3) = create_three_node_cluster();
let _vote_requests = n1.start_election();
assert_eq!(n1.state(), NodeState::Candidate);
let rejected = RequestVoteResponse::rejected(1);
let became_leader = n1.handle_vote_response(2, rejected);
assert!(!became_leader);
assert_eq!(n1.state(), NodeState::Candidate);
}
#[test]
fn test_election_with_five_nodes() {
let peers = vec![1, 2, 3, 4, 5];
let n1 = RaftNode::new(RaftConfig::new(1, peers.clone())).expect("n1");
let n2 = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("n2");
let n3 = RaftNode::new(RaftConfig::new(3, peers.clone())).expect("n3");
let n4 = RaftNode::new(RaftConfig::new(4, peers.clone())).expect("n4");
let _n5 = RaftNode::new(RaftConfig::new(5, peers)).expect("n5");
let _vote_requests = n1.start_election();
let req = RequestVoteRequest::new(n1.current_term(), n1.node_id(), 0, 0);
let resp2 = n2.handle_request_vote(req.clone());
assert!(resp2.vote_granted);
let became_leader = n1.handle_vote_response(2, resp2);
assert!(!became_leader);
let req = RequestVoteRequest::new(n1.current_term(), n1.node_id(), 0, 0);
let resp3 = n3.handle_request_vote(req);
assert!(resp3.vote_granted);
let became_leader = n1.handle_vote_response(3, resp3);
assert!(became_leader);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n4.state(), NodeState::Follower);
}
#[test]
fn test_log_replication_via_append_entries() {
let (n1, n2, _n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &_n3]);
assert_eq!(n1.state(), NodeState::Leader);
let idx1 = n1
.propose(Command::from_str("SET x 1"))
.expect("propose 1 failed");
let idx2 = n1
.propose(Command::from_str("SET y 2"))
.expect("propose 2 failed");
assert_eq!(idx1, 1);
assert_eq!(idx2, 2);
let repl_requests = n1.create_replication_requests();
assert!(
!repl_requests.is_empty(),
"leader should create replication requests"
);
let (_, req_for_n2) = repl_requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("should have request for node 2");
let resp = n2.handle_append_entries(req_for_n2.clone());
assert!(resp.success, "follower should accept valid entries");
assert_eq!(resp.last_log_index, 2);
assert_eq!(n2.last_log_index(), n1.last_log_index());
}
#[test]
fn test_heartbeat_does_not_change_log() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
let heartbeats = n1.create_heartbeats();
assert!(!heartbeats.is_empty(), "leader should send heartbeats");
for (peer_id, hb) in &heartbeats {
assert!(hb.is_heartbeat(), "heartbeat entries must be empty");
let target = if *peer_id == 2 { &n2 } else { &n3 };
let resp = target.handle_append_entries(hb.clone());
assert!(resp.success, "heartbeat should be accepted by follower");
}
assert_eq!(n1.last_log_index(), 0);
assert_eq!(n2.last_log_index(), 0);
assert_eq!(n3.last_log_index(), 0);
}
#[test]
fn test_propose_as_follower_fails() {
let (n1, _n2, _n3) = create_three_node_cluster();
assert_eq!(n1.state(), NodeState::Follower);
let result = n1.propose(Command::from_str("SET x 1"));
assert!(result.is_err(), "follower should reject proposals");
}
#[test]
fn test_term_advancement_on_higher_term_vote_request() {
let (n1, n2, _n3) = create_three_node_cluster();
assert_eq!(n1.current_term(), 0);
n2.start_election();
assert_eq!(n2.current_term(), 1);
let req = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n1.handle_request_vote(req);
assert!(resp.vote_granted);
assert_eq!(n1.current_term(), 1);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_leader_steps_down_on_higher_term() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
assert_eq!(n1.current_term(), 1);
let higher_term_req = AppendEntriesRequest::heartbeat(2, 2, 0, 0, 0);
let resp = n1.handle_append_entries(higher_term_req);
assert!(resp.success);
assert_eq!(n1.current_term(), 2);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_candidate_steps_down_on_higher_term_vote_response() {
let (n1, _n2, _n3) = create_three_node_cluster();
n1.start_election();
assert_eq!(n1.state(), NodeState::Candidate);
assert_eq!(n1.current_term(), 1);
let resp = RequestVoteResponse::rejected(5);
let became_leader = n1.handle_vote_response(2, resp);
assert!(!became_leader);
assert_eq!(n1.current_term(), 5);
assert_eq!(n1.state(), NodeState::Follower);
}
#[test]
fn test_stale_vote_request_rejected() {
let (n1, n2, _n3) = create_three_node_cluster();
n1.start_election(); n1.start_election(); n1.start_election(); assert_eq!(n1.current_term(), 3);
let stale_req = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n1.handle_request_vote(stale_req);
assert!(!resp.vote_granted, "stale term vote should be rejected");
assert_eq!(resp.term, 3, "response should contain current term");
}
#[test]
fn test_stale_append_entries_rejected() {
let (n1, _n2, _n3) = create_three_node_cluster();
n1.start_election(); n1.start_election();
let stale_req = AppendEntriesRequest::heartbeat(1, 2, 0, 0, 0);
let resp = n1.handle_append_entries(stale_req);
assert!(!resp.success, "stale term AppendEntries should be rejected");
assert_eq!(resp.term, 2);
}
#[test]
fn test_replication_response_updates_leader_state() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
n1.propose(Command::from_str("SET a 1"))
.expect("propose failed");
let repl = n1.create_replication_requests();
let (_, req_for_n2) = repl.iter().find(|(p, _)| *p == 2).expect("request for n2");
let resp = n2.handle_append_entries(req_for_n2.clone());
assert!(resp.success);
n1.handle_replication_response(2, resp)
.expect("handle response failed");
assert_eq!(n1.commit_index(), 1);
}
#[test]
fn test_leader_steps_down_on_higher_term_replication_response() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
let resp = AppendEntriesResponse::new(10, false, 0, None, None);
n1.handle_replication_response(2, resp)
.expect("handle response failed");
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n1.current_term(), 10);
}
#[test]
fn test_successive_elections_increment_term() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.current_term(), 1);
assert_eq!(n1.state(), NodeState::Leader);
let _vote_requests = n2.start_election();
assert_eq!(n2.current_term(), 2);
assert_eq!(n2.state(), NodeState::Candidate);
let req = RequestVoteRequest::new(2, 2, 0, 0);
let resp = n3.handle_request_vote(req);
assert!(resp.vote_granted);
let became_leader = n2.handle_vote_response(3, resp);
assert!(became_leader);
assert_eq!(n2.state(), NodeState::Leader);
assert_eq!(n2.current_term(), 2);
let hb = AppendEntriesRequest::heartbeat(2, 2, 0, 0, 0);
let resp = n1.handle_append_entries(hb);
assert!(resp.success);
assert_eq!(n1.state(), NodeState::Follower);
assert_eq!(n1.current_term(), 2);
}
#[test]
fn test_duplicate_vote_for_same_candidate() {
let (n1, n2, _n3) = create_three_node_cluster();
n1.start_election();
let req = RequestVoteRequest::new(1, 1, 0, 0);
let resp1 = n2.handle_request_vote(req.clone());
assert!(resp1.vote_granted);
let resp2 = n2.handle_request_vote(req);
assert!(resp2.vote_granted);
}
#[test]
fn test_vote_rejected_when_already_voted_for_different_candidate() {
let (n1, n2, n3) = create_three_node_cluster();
n1.start_election();
let req_from_n1 = RequestVoteRequest::new(1, 1, 0, 0);
let resp = n3.handle_request_vote(req_from_n1);
assert!(resp.vote_granted);
let req_from_n2 = RequestVoteRequest::new(1, 2, 0, 0);
let resp = n3.handle_request_vote(req_from_n2);
assert!(
!resp.vote_granted,
"should reject vote for different candidate in same term"
);
}
#[test]
fn test_three_node_cluster_leader_replication() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
let idx1 = n1.propose(Command::from_str("SET a 1")).expect("propose 1");
let idx2 = n1.propose(Command::from_str("SET b 2")).expect("propose 2");
let idx3 = n1.propose(Command::from_str("SET c 3")).expect("propose 3");
assert!(idx1 < idx2 && idx2 < idx3, "indices must be ascending");
for (peer_id, req) in n1.create_replication_requests() {
let follower = if peer_id == n2.node_id() { &n2 } else { &n3 };
let resp = follower.handle_append_entries(req);
assert!(resp.success, "follower {} must accept entries", peer_id);
n1.handle_replication_response(peer_id, resp)
.expect("handle resp");
}
assert!(
n1.commit_index() >= idx3,
"leader commit_index ({}) must be >= {} after full replication",
n1.commit_index(),
idx3
);
assert_eq!(
n2.last_log_index(),
n1.last_log_index(),
"n2 log must match leader"
);
assert_eq!(
n3.last_log_index(),
n1.last_log_index(),
"n3 log must match leader"
);
}
#[test]
fn test_lagging_follower_catches_up() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
for i in 0..3u64 {
let cmd = Command::from_str(&format!("SET key{} {}", i, i));
n1.propose(cmd).expect("propose");
}
for (peer_id, req) in n1.create_replication_requests() {
if peer_id == n2.node_id() {
let resp = n2.handle_append_entries(req);
if resp.success {
n1.handle_replication_response(peer_id, resp)
.expect("handle n2 resp");
}
}
}
assert!(
n3.last_log_index() < n1.last_log_index(),
"n3 must be lagging: n3={} < n1={}",
n3.last_log_index(),
n1.last_log_index()
);
for (peer_id, req) in n1.create_replication_requests() {
if peer_id == n3.node_id() {
let resp = n3.handle_append_entries(req);
if resp.success {
n1.handle_replication_response(peer_id, resp)
.expect("handle n3 resp");
}
}
}
assert_eq!(
n3.last_log_index(),
n1.last_log_index(),
"n3 must catch up to leader after receiving missed entries"
);
}
#[test]
fn test_add_node_joint_consensus() {
let (n1, n2, n3) = create_three_node_cluster();
elect_leader(&n1, &[&n2, &n3]);
assert_eq!(n1.state(), NodeState::Leader);
let result = n1.add_node(4, "127.0.0.1:9004".to_string());
assert!(
result.is_ok(),
"add_node must succeed for leader: {:?}",
result
);
assert!(
n1.is_in_joint_consensus(),
"cluster must enter joint consensus after add_node"
);
let commit_result = n1.commit_membership_change();
assert!(
commit_result.is_ok(),
"commit_membership_change must succeed: {:?}",
commit_result
);
assert!(
!n1.is_in_joint_consensus(),
"cluster must exit joint consensus after commit"
);
let members: Vec<_> = n1.cluster_members().into_iter().map(|(id, _)| id).collect();
assert!(
members.contains(&4),
"cluster members must include node 4: {:?}",
members
);
}
#[test]
fn test_leader_election_after_partition_simulation() {
let peers = vec![1u64, 2, 3, 4, 5];
let n1 = RaftNode::new(RaftConfig::new(1, peers.clone())).expect("n1");
let n2 = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("n2");
let n3 = RaftNode::new(RaftConfig::new(3, peers.clone())).expect("n3");
let n4 = RaftNode::new(RaftConfig::new(4, peers.clone())).expect("n4");
let n5 = RaftNode::new(RaftConfig::new(5, peers)).expect("n5");
let _vote_reqs = n1.start_election();
let term1 = n1.current_term();
for voter in [&n2, &n3, &n4, &n5] {
let req = RequestVoteRequest::new(term1, n1.node_id(), n1.last_log_index(), 0);
let resp = voter.handle_request_vote(req);
if resp.vote_granted && n1.handle_vote_response(voter.node_id(), resp) {
break; }
}
assert_eq!(
n1.state(),
NodeState::Leader,
"n1 must be leader after phase-1 election"
);
let original_leader_term = n1.current_term();
let _reqs2 = n2.start_election();
let term2 = n2.current_term();
assert!(
term2 > original_leader_term,
"new election term ({term2}) must exceed original leader term ({original_leader_term})"
);
let mut new_leader_elected = false;
for voter in [&n3, &n4, &n5] {
let req = RequestVoteRequest::new(term2, n2.node_id(), n2.last_log_index(), 0);
let resp = voter.handle_request_vote(req);
if resp.vote_granted && n2.handle_vote_response(voter.node_id(), resp) {
new_leader_elected = true;
break;
}
}
assert!(
new_leader_elected,
"n2 must win the election in the non-partitioned group"
);
assert_eq!(n2.state(), NodeState::Leader, "n2 must be the new leader");
let heal_hb = AppendEntriesRequest::heartbeat(term2, n2.node_id(), 0, 0, 0);
let resp = n1.handle_append_entries(heal_hb);
assert!(
resp.success,
"n1 must accept a valid higher-term heartbeat after partition heal"
);
assert_eq!(
n1.state(),
NodeState::Follower,
"original leader (n1) must step down after seeing higher term from new leader (n2)"
);
assert_eq!(
n1.current_term(),
term2,
"n1 must advance its term to the new leader's term"
);
}
#[test]
#[ignore]
fn test_hundred_node_cluster_elects_leader() {
let n_nodes = 101usize;
let peers: Vec<u64> = (1..=(n_nodes as u64)).collect();
let nodes: Vec<RaftNode> = peers
.iter()
.map(|&id| RaftNode::new(RaftConfig::new(id, peers.clone())).expect("node creation failed"))
.collect();
let leader_idx = 0;
let _vote_reqs = nodes[leader_idx].start_election();
for (i, voter) in nodes.iter().enumerate() {
if i == leader_idx {
continue;
}
let req = RequestVoteRequest::new(
nodes[leader_idx].current_term(),
nodes[leader_idx].node_id(),
nodes[leader_idx].last_log_index(),
0,
);
let resp = voter.handle_request_vote(req);
if resp.vote_granted && nodes[leader_idx].handle_vote_response(voter.node_id(), resp) {
break; }
}
assert_eq!(
nodes[leader_idx].state(),
NodeState::Leader,
"node 1 must become leader in 101-node cluster"
);
}
#[test]
#[ignore]
fn test_large_log_compaction() {
let snap_dir = tempfile::TempDir::new().expect("create temp dir");
let peers = vec![1u64, 2, 3];
let mut cfg = RaftConfig::new(1, peers.clone());
cfg.snapshot_dir = Some(snap_dir.path().to_path_buf());
cfg.snapshot_threshold = 100; cfg.max_snapshots = 10; cfg.max_entries_per_message = 100;
let leader = RaftNode::new(cfg).expect("create leader");
let follower = RaftNode::new(RaftConfig::new(2, peers.clone())).expect("create follower");
leader.start_election();
let vote_req = RequestVoteRequest::new(
leader.current_term(),
leader.node_id(),
leader.last_log_index(),
0,
);
let vote_resp = follower.handle_request_vote(vote_req);
assert!(vote_resp.vote_granted, "follower must grant vote");
let became_leader = leader.handle_vote_response(follower.node_id(), vote_resp);
assert!(became_leader, "node 1 must become leader");
assert_eq!(leader.state(), NodeState::Leader);
const TOTAL_ENTRIES: u64 = 500;
const BATCH_SIZE: u64 = 100;
let mut snapshots_created: u64 = 0;
for batch in 0..(TOTAL_ENTRIES / BATCH_SIZE) {
let batch_start = batch * BATCH_SIZE + 1;
let batch_end = batch_start + BATCH_SIZE;
for i in batch_start..batch_end {
leader
.propose(Command::from_str(&format!("SET key{i} val{i}")))
.expect("propose must succeed");
}
let mut iterations = 0usize;
loop {
iterations += 1;
assert!(
iterations <= 200,
"replication must converge within 200 iterations"
);
let repl = leader.create_replication_requests();
if repl.is_empty() {
break; }
let mut made_progress = false;
for (peer_id, req) in repl {
if peer_id == follower.node_id() {
let resp = follower.handle_append_entries(req);
if resp.success {
leader
.handle_replication_response(peer_id, resp)
.expect("handle replication response");
made_progress = true;
}
}
}
if follower.last_log_index() >= leader.last_log_index() {
break;
}
if !made_progress {
break; }
}
let created = leader
.maybe_create_snapshot(b"state_machine_snapshot".to_vec())
.expect("maybe_create_snapshot must not error");
if created {
snapshots_created += 1;
}
}
assert!(
snapshots_created >= 1,
"at least 1 snapshot must have been created across 500 entries (got {snapshots_created})"
);
assert_eq!(
leader.commit_index(),
TOTAL_ENTRIES,
"leader commit_index must reach {TOTAL_ENTRIES} after full replication"
);
assert!(
snapshots_created >= 4,
"with threshold=100 and 500 entries, at least 4 snapshots must have been created \
(got {snapshots_created})"
);
}
#[test]
#[ignore = "requires live cluster with 10K+ connection capacity"]
fn test_high_connection_count_10k() {
todo!("requires live cluster");
}
#[test]
#[ignore = "requires live cluster capable of 100K+ rps"]
fn test_high_request_rate_100k_rps() {
todo!("requires live cluster");
}