use super::*;
use tracing_test::traced_test;
fn create_test_node(node_id: NodeId) -> RaftNode {
let config = RaftConfig::new(node_id, vec![1, 2, 3]);
RaftNode::new(config).expect("Failed to create node")
}
fn create_test_node_with_snapshots(node_id: NodeId) -> (RaftNode, tempfile::TempDir) {
let dir = tempfile::TempDir::new().expect("Failed to create temp dir");
let mut config = RaftConfig::new(node_id, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
let node = RaftNode::new(config).expect("Failed to create node");
(node, dir)
}
#[test]
fn test_new_node() {
let node = create_test_node(1);
assert_eq!(node.node_id(), 1);
assert_eq!(node.current_term(), 0);
assert_eq!(node.state(), NodeState::Follower);
assert_eq!(node.leader_id(), None);
}
#[test]
fn test_start_election() {
let node = create_test_node(1);
let requests = node.start_election();
assert_eq!(node.state(), NodeState::Candidate);
assert_eq!(node.current_term(), 1);
assert_eq!(requests.len(), 2); }
#[test]
fn test_handle_vote_granted() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
let became_leader = node.handle_vote_response(2, resp);
assert!(became_leader);
assert_eq!(node.state(), NodeState::Leader);
}
#[test]
fn test_propose_as_follower() {
let node = create_test_node(1);
let result = node.propose(Command::from_str("test"));
assert!(result.is_err());
}
#[test]
fn test_propose_as_leader() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
let result = node.propose(Command::from_str("test"));
assert!(result.is_ok());
}
#[test]
fn test_maybe_create_snapshot_below_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..3 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(3).expect("Set commit should succeed");
log.set_applied_index(3)
.expect("Set applied should succeed");
}
let created = node
.maybe_create_snapshot(b"state data".to_vec())
.expect("maybe_create_snapshot should succeed");
assert!(!created);
}
#[test]
fn test_maybe_create_snapshot_above_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("Set commit should succeed");
log.set_applied_index(6)
.expect("Set applied should succeed");
}
let created = node
.maybe_create_snapshot(b"full state".to_vec())
.expect("maybe_create_snapshot should succeed");
assert!(created);
let log = node.log.read();
assert_eq!(log.snapshot_index(), 6);
assert!(log.is_empty());
}
#[test]
fn test_handle_install_snapshot_rpc() {
let (node, _dir) = create_test_node_with_snapshots(1);
{
let mut persistent = node.persistent.write();
persistent.current_term = 5;
}
let req = InstallSnapshotRequest::new_complete(5, 2, 100, 4, b"snapshot data".to_vec());
let resp = node
.handle_install_snapshot(req)
.expect("handle_install_snapshot should succeed");
assert_eq!(resp.term, 5);
let log = node.log.read();
assert_eq!(log.last_index(), 100);
assert_eq!(log.snapshot_index(), 100);
assert_eq!(log.snapshot_term(), 4);
}
#[test]
fn test_handle_install_snapshot_stale_term() {
let (node, _dir) = create_test_node_with_snapshots(1);
{
let mut persistent = node.persistent.write();
persistent.current_term = 10;
}
let req = InstallSnapshotRequest::new_complete(5, 2, 50, 3, b"old data".to_vec());
let resp = node
.handle_install_snapshot(req)
.expect("handle_install_snapshot should succeed");
assert_eq!(resp.term, 10);
let log = node.log.read();
assert_eq!(log.last_index(), 0);
}
#[test]
fn test_follower_needs_snapshot() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("Set commit should succeed");
log.set_applied_index(6)
.expect("Set applied should succeed");
}
node.maybe_create_snapshot(b"state".to_vec())
.expect("Snapshot should succeed");
{
let mut leader_state_guard = node.leader_state.write();
if let Some(state) = leader_state_guard.as_mut() {
state.next_index.insert(2, 7);
state.next_index.insert(3, 3);
}
}
assert!(node.follower_needs_snapshot(3));
assert!(!node.follower_needs_snapshot(2));
}
#[test]
fn test_raft_node_with_persistence() {
use crate::persistence::MemoryPersistence;
let mp: Arc<dyn RaftPersistence> = Arc::new(MemoryPersistence::new());
{
let config = RaftConfig::new(1, vec![1, 2, 3]);
let node =
RaftNode::with_persistence(config, Arc::clone(&mp)).expect("create with persistence");
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
node.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
node.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
}
{
let config = RaftConfig::new(1, vec![1, 2, 3]);
let node =
RaftNode::with_persistence(config, Arc::clone(&mp)).expect("recover with persistence");
assert_eq!(node.current_term(), 1);
assert_eq!(node.last_log_index(), 2);
assert_eq!(node.state(), NodeState::Follower);
}
}
fn create_leader_node(node_id: NodeId) -> RaftNode {
let node = create_test_node(node_id);
node.start_election();
let resp = RequestVoteResponse::granted(node.current_term());
node.handle_vote_response(if node_id == 1 { 2 } else { 1 }, resp);
assert_eq!(node.state(), NodeState::Leader);
node
}
fn create_leader_5node(node_id: NodeId) -> RaftNode {
let config = RaftConfig::new(node_id, vec![1, 2, 3, 4, 5]);
let node = RaftNode::new(config).expect("Failed to create 5-node");
node.start_election();
let term = node.current_term();
let peers: Vec<NodeId> = vec![1, 2, 3, 4, 5]
.into_iter()
.filter(|&p| p != node_id)
.collect();
node.handle_vote_response(peers[0], RequestVoteResponse::granted(term));
node.handle_vote_response(peers[1], RequestVoteResponse::granted(term));
assert_eq!(node.state(), NodeState::Leader);
node
}
#[test]
fn test_add_node_3_to_4() {
let node = create_leader_node(1);
assert!(!node.is_in_joint_consensus());
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
assert!(node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert!(ids.contains(&4));
assert_eq!(ids.len(), 4);
node.commit_membership_change()
.expect("commit should succeed");
assert!(!node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert_eq!(ids.len(), 4);
assert!(ids.contains(&4));
}
#[test]
fn test_remove_node_5_to_4() {
let node = create_leader_5node(1);
assert!(!node.is_in_joint_consensus());
node.remove_node(5).expect("remove_node should succeed");
assert!(node.is_in_joint_consensus());
node.commit_membership_change()
.expect("commit should succeed");
assert!(!node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert_eq!(ids.len(), 4);
assert!(!ids.contains(&5));
}
#[test]
fn test_reject_concurrent_membership_changes() {
let node = create_leader_node(1);
node.add_node(4, "addr4".to_string())
.expect("first add_node should succeed");
assert!(node.is_in_joint_consensus());
let result = node.add_node(5, "addr5".to_string());
assert!(result.is_err());
match result {
Err(RaftError::MembershipChangeInProgress) => {}
other => panic!("Expected MembershipChangeInProgress, got {:?}", other),
}
}
#[test]
fn test_joint_consensus_quorum_requires_both_configs() {
let node = create_leader_node(1);
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
let mut responding = std::collections::HashSet::new();
responding.insert(1u64);
responding.insert(2);
assert!(!node.has_quorum(&responding));
responding.insert(3);
assert!(node.has_quorum(&responding));
}
#[test]
fn test_leader_removal_triggers_step_down() {
let node = create_leader_node(1);
assert!(!node.is_stepping_down());
node.remove_node(1)
.expect("remove_node(self) should succeed");
node.commit_membership_change()
.expect("commit should succeed");
assert!(node.is_stepping_down());
assert_eq!(node.state(), NodeState::Follower);
}
#[test]
fn test_membership_version_increments() {
let node = create_leader_node(1);
let v0 = node.membership_version();
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
let v1 = node.membership_version();
assert!(v1 > v0, "version should increase after entering joint");
node.commit_membership_change()
.expect("commit should succeed");
let v2 = node.membership_version();
assert!(v2 >= v1);
}
#[test]
fn test_get_current_members() {
let node = create_leader_node(1);
let members = node.cluster_members();
assert_eq!(members.len(), 3);
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
}
#[test]
fn test_add_node_already_member_is_error() {
let node = create_leader_node(1);
let result = node.add_node(2, "addr2".to_string());
assert!(result.is_err());
match result {
Err(RaftError::NodeAlreadyMember { node_id }) => {
assert_eq!(node_id, 2);
}
other => panic!("Expected NodeAlreadyMember, got {:?}", other),
}
}
#[test]
fn test_remove_nonexistent_node_is_error() {
let node = create_leader_node(1);
let result = node.remove_node(99);
assert!(result.is_err());
match result {
Err(RaftError::NodeNotMember { node_id }) => {
assert_eq!(node_id, 99);
}
other => panic!("Expected NodeNotMember, got {:?}", other),
}
}
#[test]
fn test_non_leader_cannot_propose_membership_change() {
let node = create_test_node(1); let result = node.add_node(4, "addr4".to_string());
assert!(result.is_err());
match result {
Err(RaftError::NotLeader { .. }) => {}
other => panic!("Expected NotLeader, got {:?}", other),
}
}
#[test]
fn test_basic_replication_leader_sends_follower_appends() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
let requests = leader.replicate_to_followers();
assert!(
!requests.is_empty(),
"leader should have replication requests"
);
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("should have request for peer 2");
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
let resp = follower.handle_append_entries(req.clone());
assert!(resp.success, "follower should accept valid entries");
assert_eq!(resp.last_log_index, 2, "follower should have 2 entries");
let log = follower.log.read();
assert_eq!(log.last_index(), 2);
assert_eq!(log.last_term(), leader.current_term());
}
#[test]
fn test_log_consistency_check_passes() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
let requests = leader.replicate_to_followers();
let (_, req1) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp1 = follower.handle_append_entries(req1.clone());
assert!(resp1.success);
leader
.handle_replication_response(2, resp1)
.expect("handle response");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
let requests2 = leader.replicate_to_followers();
let (_, req2) = requests2
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
assert_eq!(req2.prev_log_index, 1, "prev should point to first entry");
assert_eq!(req2.prev_log_term, leader.current_term());
let resp2 = follower.handle_append_entries(req2.clone());
assert!(resp2.success, "consistency check should pass");
assert_eq!(resp2.last_log_index, 2);
}
#[test]
fn test_log_inconsistency_follower_rejects_leader_backs_up() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
for i in 1..=3 {
leader
.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
let term = leader.current_term();
let req = AppendEntriesRequest::new(
term,
1, 3, term, vec![], 0, );
let resp = follower.handle_append_entries(req);
assert!(!resp.success, "follower should reject -- missing prev_log");
assert!(
resp.conflict_index.is_some(),
"should have conflict index for fast backup"
);
leader
.handle_replication_response(2, resp)
.expect("handle response");
let leader_state_guard = leader.leader_state.read();
let leader_state = leader_state_guard
.as_ref()
.expect("leader state should exist");
let next_index = leader_state.get_next_index(2);
assert!(
next_index <= 1,
"next_index should be backed up, got {}",
next_index
);
}
#[test]
fn test_commit_index_advancement_after_majority() {
let leader = create_leader_node(1);
let follower2 = create_test_node(2);
let follower3 = create_test_node(3);
{
let term = leader.current_term();
follower2.persistent.write().current_term = term;
follower3.persistent.write().current_term = term;
}
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
assert_eq!(leader.commit_index(), 0, "not committed yet");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp = follower2.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
assert_eq!(
leader.commit_index(),
2,
"commit index should advance after majority replication"
);
let requests = leader.replicate_to_followers();
if let Some((_, req)) = requests.iter().find(|(peer, _)| *peer == 3) {
let resp = follower3.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(3, resp)
.expect("handle response");
}
}
#[test]
fn test_heartbeat_resets_election_timer() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
let req = AppendEntriesRequest::heartbeat(1, 1, 0, 0, 0);
let before = std::time::Instant::now();
let resp = follower.handle_append_entries(req);
assert!(resp.success, "heartbeat should succeed");
let elapsed = before.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"election timer should have been reset recently"
);
assert_eq!(
follower.leader_id(),
Some(1),
"follower should know the leader"
);
}
#[test]
fn test_stale_term_rejection() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 5;
}
let req = AppendEntriesRequest::heartbeat(3, 1, 0, 0, 0);
let resp = follower.handle_append_entries(req);
assert!(!resp.success, "should reject stale term");
assert_eq!(resp.term, 5, "should return current term");
}
#[test]
fn test_follower_overwrites_conflicting_entries() {
let follower = create_test_node(2);
{
let mut log = follower.log.write();
log.append(1, Command::from_str("old_cmd1"));
log.append(1, Command::from_str("old_cmd2"));
log.append(1, Command::from_str("old_cmd3"));
}
{
let mut persistent = follower.persistent.write();
persistent.current_term = 2;
}
let entries = vec![
LogEntry::new(2, 2, Command::from_str("new_cmd2")),
LogEntry::new(2, 3, Command::from_str("new_cmd3")),
];
let req = AppendEntriesRequest::new(
2, 1, 1, 1, entries, 0, );
let resp = follower.handle_append_entries(req);
assert!(
resp.success,
"should accept and overwrite conflicting entries"
);
assert_eq!(resp.last_log_index, 3);
let log = follower.log.read();
let entry2 = log.get(2).expect("entry 2 should exist");
assert_eq!(entry2.term, 2, "entry 2 should have new term");
assert_eq!(entry2.command.data, b"new_cmd2");
let entry3 = log.get(3).expect("entry 3 should exist");
assert_eq!(entry3.term, 2, "entry 3 should have new term");
assert_eq!(entry3.command.data, b"new_cmd3");
}
#[test]
fn test_fast_catchup_with_conflict_hint() {
let leader = create_leader_node(1);
for i in 1..=5 {
leader
.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
{
let mut ls = leader.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 6);
}
let resp = AppendEntriesResponse::failure(
leader.current_term(),
2, 2, 1, );
{
let ls = leader.leader_state.read();
let state = ls.as_ref().expect("leader state");
assert_eq!(state.get_next_index(2), 6);
}
leader
.handle_replication_response(2, resp)
.expect("handle response");
{
let ls = leader.leader_state.read();
let state = ls.as_ref().expect("leader state");
let next = state.get_next_index(2);
assert!(
next <= 2,
"next_index should be backed up to conflict point, got {}",
next
);
}
}
#[test]
fn test_only_commit_entries_from_current_term() {
let leader = create_leader_node(1);
let follower2 = create_test_node(2);
let leader_term = leader.current_term();
follower2.persistent.write().current_term = leader_term;
{
let mut log = leader.log.write();
}
leader.propose(Command::from_str("cmd1")).expect("propose");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp = follower2.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
assert_eq!(leader.commit_index(), 1);
}
#[test]
fn test_heartbeat_with_no_entries_succeeds() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
{
let mut log = follower.log.write();
log.append(1, Command::from_str("cmd1"));
log.append(1, Command::from_str("cmd2"));
}
let req = AppendEntriesRequest::heartbeat(1, 1, 2, 1, 0);
let resp = follower.handle_append_entries(req);
assert!(resp.success);
assert_eq!(resp.last_log_index, 2);
}
#[test]
fn test_heartbeat_advances_follower_commit_index() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
{
let mut log = follower.log.write();
log.append(1, Command::from_str("cmd1"));
log.append(1, Command::from_str("cmd2"));
}
assert_eq!(follower.commit_index(), 0);
let req = AppendEntriesRequest::heartbeat(1, 1, 2, 1, 2);
let resp = follower.handle_append_entries(req);
assert!(resp.success);
assert_eq!(
follower.commit_index(),
2,
"follower commit index should advance via heartbeat"
);
}