use super::*;
use tracing_test::traced_test;
fn create_test_node(node_id: NodeId) -> RaftNode {
let config = RaftConfig::new(node_id, vec![1, 2, 3]);
RaftNode::new(config).expect("Failed to create node")
}
fn create_test_node_with_snapshots(node_id: NodeId) -> (RaftNode, tempfile::TempDir) {
let dir = tempfile::TempDir::new().expect("Failed to create temp dir");
let mut config = RaftConfig::new(node_id, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
let node = RaftNode::new(config).expect("Failed to create node");
(node, dir)
}
#[test]
fn test_new_node() {
let node = create_test_node(1);
assert_eq!(node.node_id(), 1);
assert_eq!(node.current_term(), 0);
assert_eq!(node.state(), NodeState::Follower);
assert_eq!(node.leader_id(), None);
}
#[test]
fn test_start_election() {
let node = create_test_node(1);
let requests = node.start_election();
assert_eq!(node.state(), NodeState::Candidate);
assert_eq!(node.current_term(), 1);
assert_eq!(requests.len(), 2); }
#[test]
fn test_handle_vote_granted() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
let became_leader = node.handle_vote_response(2, resp);
assert!(became_leader);
assert_eq!(node.state(), NodeState::Leader);
}
#[test]
fn test_propose_as_follower() {
let node = create_test_node(1);
let result = node.propose(Command::from_str("test"));
assert!(result.is_err());
}
#[test]
fn test_propose_as_leader() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
let result = node.propose(Command::from_str("test"));
assert!(result.is_ok());
}
#[test]
fn test_maybe_create_snapshot_below_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..3 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(3).expect("Set commit should succeed");
log.set_applied_index(3)
.expect("Set applied should succeed");
}
let created = node
.maybe_create_snapshot(b"state data".to_vec())
.expect("maybe_create_snapshot should succeed");
assert!(!created);
}
#[test]
fn test_maybe_create_snapshot_above_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("Set commit should succeed");
log.set_applied_index(6)
.expect("Set applied should succeed");
}
let created = node
.maybe_create_snapshot(b"full state".to_vec())
.expect("maybe_create_snapshot should succeed");
assert!(created);
let log = node.log.read();
assert_eq!(log.snapshot_index(), 6);
assert!(log.is_empty());
}
#[test]
fn test_handle_install_snapshot_rpc() {
let (node, _dir) = create_test_node_with_snapshots(1);
{
let mut persistent = node.persistent.write();
persistent.current_term = 5;
}
let req = InstallSnapshotRequest::new_complete(5, 2, 100, 4, b"snapshot data".to_vec());
let resp = node
.handle_install_snapshot(req)
.expect("handle_install_snapshot should succeed");
assert_eq!(resp.term, 5);
let log = node.log.read();
assert_eq!(log.last_index(), 100);
assert_eq!(log.snapshot_index(), 100);
assert_eq!(log.snapshot_term(), 4);
}
#[test]
fn test_handle_install_snapshot_stale_term() {
let (node, _dir) = create_test_node_with_snapshots(1);
{
let mut persistent = node.persistent.write();
persistent.current_term = 10;
}
let req = InstallSnapshotRequest::new_complete(5, 2, 50, 3, b"old data".to_vec());
let resp = node
.handle_install_snapshot(req)
.expect("handle_install_snapshot should succeed");
assert_eq!(resp.term, 10);
let log = node.log.read();
assert_eq!(log.last_index(), 0);
}
#[test]
fn test_follower_needs_snapshot() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("Set commit should succeed");
log.set_applied_index(6)
.expect("Set applied should succeed");
}
node.maybe_create_snapshot(b"state".to_vec())
.expect("Snapshot should succeed");
{
let mut leader_state_guard = node.leader_state.write();
if let Some(state) = leader_state_guard.as_mut() {
state.next_index.insert(2, 7);
state.next_index.insert(3, 3);
}
}
assert!(node.follower_needs_snapshot(3));
assert!(!node.follower_needs_snapshot(2));
}
#[test]
fn test_raft_node_with_persistence() {
use crate::persistence::MemoryPersistence;
let mp: Arc<dyn RaftPersistence> = Arc::new(MemoryPersistence::new());
{
let config = RaftConfig::new(1, vec![1, 2, 3]);
let node =
RaftNode::with_persistence(config, Arc::clone(&mp)).expect("create with persistence");
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
node.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
node.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
}
{
let config = RaftConfig::new(1, vec![1, 2, 3]);
let node =
RaftNode::with_persistence(config, Arc::clone(&mp)).expect("recover with persistence");
assert_eq!(node.current_term(), 1);
assert_eq!(node.last_log_index(), 2);
assert_eq!(node.state(), NodeState::Follower);
}
}
fn create_leader_node(node_id: NodeId) -> RaftNode {
let node = create_test_node(node_id);
node.start_election();
let resp = RequestVoteResponse::granted(node.current_term());
node.handle_vote_response(if node_id == 1 { 2 } else { 1 }, resp);
assert_eq!(node.state(), NodeState::Leader);
node
}
fn create_leader_5node(node_id: NodeId) -> RaftNode {
let config = RaftConfig::new(node_id, vec![1, 2, 3, 4, 5]);
let node = RaftNode::new(config).expect("Failed to create 5-node");
node.start_election();
let term = node.current_term();
let peers: Vec<NodeId> = vec![1, 2, 3, 4, 5]
.into_iter()
.filter(|&p| p != node_id)
.collect();
node.handle_vote_response(peers[0], RequestVoteResponse::granted(term));
node.handle_vote_response(peers[1], RequestVoteResponse::granted(term));
assert_eq!(node.state(), NodeState::Leader);
node
}
#[test]
fn test_add_node_3_to_4() {
let node = create_leader_node(1);
assert!(!node.is_in_joint_consensus());
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
assert!(node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert!(ids.contains(&4));
assert_eq!(ids.len(), 4);
node.commit_membership_change()
.expect("commit should succeed");
assert!(!node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert_eq!(ids.len(), 4);
assert!(ids.contains(&4));
}
#[test]
fn test_remove_node_5_to_4() {
let node = create_leader_5node(1);
assert!(!node.is_in_joint_consensus());
node.remove_node(5).expect("remove_node should succeed");
assert!(node.is_in_joint_consensus());
node.commit_membership_change()
.expect("commit should succeed");
assert!(!node.is_in_joint_consensus());
let members = node.cluster_members();
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert_eq!(ids.len(), 4);
assert!(!ids.contains(&5));
}
#[test]
fn test_reject_concurrent_membership_changes() {
let node = create_leader_node(1);
node.add_node(4, "addr4".to_string())
.expect("first add_node should succeed");
assert!(node.is_in_joint_consensus());
let result = node.add_node(5, "addr5".to_string());
assert!(result.is_err());
match result {
Err(RaftError::MembershipChangeInProgress) => {}
other => panic!("Expected MembershipChangeInProgress, got {:?}", other),
}
}
#[test]
fn test_joint_consensus_quorum_requires_both_configs() {
let node = create_leader_node(1);
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
let mut responding = std::collections::HashSet::new();
responding.insert(1u64);
responding.insert(2);
assert!(!node.has_quorum(&responding));
responding.insert(3);
assert!(node.has_quorum(&responding));
}
#[test]
fn test_leader_removal_triggers_step_down() {
let node = create_leader_node(1);
assert!(!node.is_stepping_down());
node.remove_node(1)
.expect("remove_node(self) should succeed");
node.commit_membership_change()
.expect("commit should succeed");
assert!(node.is_stepping_down());
assert_eq!(node.state(), NodeState::Follower);
}
#[test]
fn test_membership_version_increments() {
let node = create_leader_node(1);
let v0 = node.membership_version();
node.add_node(4, "addr4".to_string())
.expect("add_node should succeed");
let v1 = node.membership_version();
assert!(v1 > v0, "version should increase after entering joint");
node.commit_membership_change()
.expect("commit should succeed");
let v2 = node.membership_version();
assert!(v2 >= v1);
}
#[test]
fn test_get_current_members() {
let node = create_leader_node(1);
let members = node.cluster_members();
assert_eq!(members.len(), 3);
let ids: std::collections::HashSet<NodeId> = members.iter().map(|(id, _)| *id).collect();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
}
#[test]
fn test_add_node_already_member_is_error() {
let node = create_leader_node(1);
let result = node.add_node(2, "addr2".to_string());
assert!(result.is_err());
match result {
Err(RaftError::NodeAlreadyMember { node_id }) => {
assert_eq!(node_id, 2);
}
other => panic!("Expected NodeAlreadyMember, got {:?}", other),
}
}
#[test]
fn test_remove_nonexistent_node_is_error() {
let node = create_leader_node(1);
let result = node.remove_node(99);
assert!(result.is_err());
match result {
Err(RaftError::NodeNotMember { node_id }) => {
assert_eq!(node_id, 99);
}
other => panic!("Expected NodeNotMember, got {:?}", other),
}
}
#[test]
fn test_non_leader_cannot_propose_membership_change() {
let node = create_test_node(1); let result = node.add_node(4, "addr4".to_string());
assert!(result.is_err());
match result {
Err(RaftError::NotLeader { .. }) => {}
other => panic!("Expected NotLeader, got {:?}", other),
}
}
#[test]
fn test_basic_replication_leader_sends_follower_appends() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
let requests = leader.replicate_to_followers();
assert!(
!requests.is_empty(),
"leader should have replication requests"
);
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("should have request for peer 2");
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
let resp = follower.handle_append_entries(req.clone());
assert!(resp.success, "follower should accept valid entries");
assert_eq!(resp.last_log_index, 2, "follower should have 2 entries");
let log = follower.log.read();
assert_eq!(log.last_index(), 2);
assert_eq!(log.last_term(), leader.current_term());
}
#[test]
fn test_log_consistency_check_passes() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
let requests = leader.replicate_to_followers();
let (_, req1) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp1 = follower.handle_append_entries(req1.clone());
assert!(resp1.success);
leader
.handle_replication_response(2, resp1)
.expect("handle response");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
let requests2 = leader.replicate_to_followers();
let (_, req2) = requests2
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
assert_eq!(req2.prev_log_index, 1, "prev should point to first entry");
assert_eq!(req2.prev_log_term, leader.current_term());
let resp2 = follower.handle_append_entries(req2.clone());
assert!(resp2.success, "consistency check should pass");
assert_eq!(resp2.last_log_index, 2);
}
#[test]
fn test_log_inconsistency_follower_rejects_leader_backs_up() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = leader.current_term();
}
for i in 1..=3 {
leader
.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
let term = leader.current_term();
let req = AppendEntriesRequest::new(
term,
1, 3, term, vec![], 0, );
let resp = follower.handle_append_entries(req);
assert!(!resp.success, "follower should reject -- missing prev_log");
assert!(
resp.conflict_index.is_some(),
"should have conflict index for fast backup"
);
leader
.handle_replication_response(2, resp)
.expect("handle response");
let leader_state_guard = leader.leader_state.read();
let leader_state = leader_state_guard
.as_ref()
.expect("leader state should exist");
let next_index = leader_state.get_next_index(2);
assert!(
next_index <= 1,
"next_index should be backed up, got {}",
next_index
);
}
#[test]
fn test_commit_index_advancement_after_majority() {
let leader = create_leader_node(1);
let follower2 = create_test_node(2);
let follower3 = create_test_node(3);
{
let term = leader.current_term();
follower2.persistent.write().current_term = term;
follower3.persistent.write().current_term = term;
}
leader
.propose(Command::from_str("cmd1"))
.expect("propose cmd1");
leader
.propose(Command::from_str("cmd2"))
.expect("propose cmd2");
assert_eq!(leader.commit_index(), 0, "not committed yet");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp = follower2.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
assert_eq!(
leader.commit_index(),
2,
"commit index should advance after majority replication"
);
let requests = leader.replicate_to_followers();
if let Some((_, req)) = requests.iter().find(|(peer, _)| *peer == 3) {
let resp = follower3.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(3, resp)
.expect("handle response");
}
}
#[test]
fn test_heartbeat_resets_election_timer() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
let req = AppendEntriesRequest::heartbeat(1, 1, 0, 0, 0);
let before = std::time::Instant::now();
let resp = follower.handle_append_entries(req);
assert!(resp.success, "heartbeat should succeed");
let elapsed = before.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"election timer should have been reset recently"
);
assert_eq!(
follower.leader_id(),
Some(1),
"follower should know the leader"
);
}
#[test]
fn test_stale_term_rejection() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 5;
}
let req = AppendEntriesRequest::heartbeat(3, 1, 0, 0, 0);
let resp = follower.handle_append_entries(req);
assert!(!resp.success, "should reject stale term");
assert_eq!(resp.term, 5, "should return current term");
}
#[test]
fn test_follower_overwrites_conflicting_entries() {
let follower = create_test_node(2);
{
let mut log = follower.log.write();
log.append(1, Command::from_str("old_cmd1"));
log.append(1, Command::from_str("old_cmd2"));
log.append(1, Command::from_str("old_cmd3"));
}
{
let mut persistent = follower.persistent.write();
persistent.current_term = 2;
}
let entries = vec![
LogEntry::new(2, 2, Command::from_str("new_cmd2")),
LogEntry::new(2, 3, Command::from_str("new_cmd3")),
];
let req = AppendEntriesRequest::new(
2, 1, 1, 1, entries, 0, );
let resp = follower.handle_append_entries(req);
assert!(
resp.success,
"should accept and overwrite conflicting entries"
);
assert_eq!(resp.last_log_index, 3);
let log = follower.log.read();
let entry2 = log.get(2).expect("entry 2 should exist");
assert_eq!(entry2.term, 2, "entry 2 should have new term");
assert_eq!(entry2.command.data, b"new_cmd2");
let entry3 = log.get(3).expect("entry 3 should exist");
assert_eq!(entry3.term, 2, "entry 3 should have new term");
assert_eq!(entry3.command.data, b"new_cmd3");
}
#[test]
fn test_fast_catchup_with_conflict_hint() {
let leader = create_leader_node(1);
for i in 1..=5 {
leader
.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
{
let mut ls = leader.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 6);
}
let resp = AppendEntriesResponse::failure(
leader.current_term(),
2, 2, 1, );
{
let ls = leader.leader_state.read();
let state = ls.as_ref().expect("leader state");
assert_eq!(state.get_next_index(2), 6);
}
leader
.handle_replication_response(2, resp)
.expect("handle response");
{
let ls = leader.leader_state.read();
let state = ls.as_ref().expect("leader state");
let next = state.get_next_index(2);
assert!(
next <= 2,
"next_index should be backed up to conflict point, got {}",
next
);
}
}
#[test]
fn test_only_commit_entries_from_current_term() {
let leader = create_leader_node(1);
let follower2 = create_test_node(2);
let leader_term = leader.current_term();
follower2.persistent.write().current_term = leader_term;
{
let mut log = leader.log.write();
}
leader.propose(Command::from_str("cmd1")).expect("propose");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp = follower2.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
assert_eq!(leader.commit_index(), 1);
}
#[test]
fn test_heartbeat_with_no_entries_succeeds() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
{
let mut log = follower.log.write();
log.append(1, Command::from_str("cmd1"));
log.append(1, Command::from_str("cmd2"));
}
let req = AppendEntriesRequest::heartbeat(1, 1, 2, 1, 0);
let resp = follower.handle_append_entries(req);
assert!(resp.success);
assert_eq!(resp.last_log_index, 2);
}
#[test]
fn test_heartbeat_advances_follower_commit_index() {
let follower = create_test_node(2);
{
let mut persistent = follower.persistent.write();
persistent.current_term = 1;
}
{
let mut log = follower.log.write();
log.append(1, Command::from_str("cmd1"));
log.append(1, Command::from_str("cmd2"));
}
assert_eq!(follower.commit_index(), 0);
let req = AppendEntriesRequest::heartbeat(1, 1, 2, 1, 2);
let resp = follower.handle_append_entries(req);
assert!(resp.success);
assert_eq!(
follower.commit_index(),
2,
"follower commit index should advance via heartbeat"
);
}
#[test]
fn test_replicate_to_followers_returns_nothing_when_caught_up() {
let leader = create_leader_node(1);
let requests = leader.replicate_to_followers();
assert!(
requests.is_empty(),
"no replication requests when all followers are caught up"
);
}
#[test]
fn test_create_replication_request_for_specific_peer() {
let leader = create_leader_node(1);
leader.propose(Command::from_str("cmd1")).expect("propose");
let req = leader.create_replication_request_for(2);
assert!(req.is_some(), "should have request for peer 2");
let req = req.expect("request for peer 2");
assert_eq!(req.entries.len(), 1);
assert_eq!(req.leader_id, 1);
let follower = create_test_node(2);
assert!(follower.create_replication_request_for(3).is_none());
}
#[test]
fn test_leader_steps_down_on_higher_term_in_response() {
let leader = create_leader_node(1);
assert_eq!(leader.state(), NodeState::Leader);
leader.propose(Command::from_str("cmd1")).expect("propose");
let resp = AppendEntriesResponse::rejected(leader.current_term() + 5);
leader
.handle_replication_response(2, resp)
.expect("handle response");
assert_eq!(
leader.state(),
NodeState::Follower,
"leader should step down on higher term"
);
}
#[test]
fn test_candidate_steps_down_on_append_entries() {
let candidate = create_test_node(1);
candidate.start_election();
assert_eq!(candidate.state(), NodeState::Candidate);
let candidate_term = candidate.current_term();
let req = AppendEntriesRequest::heartbeat(candidate_term, 2, 0, 0, 0);
let resp = candidate.handle_append_entries(req);
assert!(resp.success);
assert_eq!(
candidate.state(),
NodeState::Follower,
"candidate should step down to follower"
);
assert_eq!(candidate.leader_id(), Some(2));
}
#[test]
fn test_replication_multiple_rounds() {
let leader = create_leader_node(1);
let follower = create_test_node(2);
let term = leader.current_term();
follower.persistent.write().current_term = term;
leader.propose(Command::from_str("cmd1")).expect("propose");
leader.propose(Command::from_str("cmd2")).expect("propose");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
let resp = follower.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
leader.propose(Command::from_str("cmd3")).expect("propose");
leader.propose(Command::from_str("cmd4")).expect("propose");
let requests = leader.replicate_to_followers();
let (_, req) = requests
.iter()
.find(|(peer, _)| *peer == 2)
.expect("request for peer 2");
assert_eq!(
req.entries.len(),
2,
"should only send new entries, not already replicated ones"
);
assert_eq!(req.entries[0].index, 3);
assert_eq!(req.entries[1].index, 4);
assert_eq!(
req.prev_log_index, 2,
"prev should point to last replicated"
);
let resp = follower.handle_append_entries(req.clone());
assert!(resp.success);
assert_eq!(resp.last_log_index, 4);
}
#[test]
fn test_commit_index_joint_consensus() {
let leader = create_leader_5node(1);
let follower2 = create_test_node(2);
let config3 = RaftConfig::new(3, vec![1, 2, 3, 4, 5]);
let follower3 = RaftNode::new(config3).expect("create node 3");
let term = leader.current_term();
follower2.persistent.write().current_term = term;
follower3.persistent.write().current_term = term;
leader.propose(Command::from_str("cmd1")).expect("propose");
let requests = leader.replicate_to_followers();
if let Some((_, req)) = requests.iter().find(|(peer, _)| *peer == 2) {
let resp = follower2.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(2, resp)
.expect("handle response");
}
let requests = leader.replicate_to_followers();
if let Some((_, req)) = requests.iter().find(|(peer, _)| *peer == 3) {
let resp = follower3.handle_append_entries(req.clone());
assert!(resp.success);
leader
.handle_replication_response(3, resp)
.expect("handle response");
}
assert_eq!(
leader.commit_index(),
1,
"commit index should advance with 3/5 quorum"
);
}
#[test]
fn test_append_entries_updates_follower_state_to_follower() {
let node = create_test_node(1);
node.start_election();
assert_eq!(node.state(), NodeState::Candidate);
let term = node.current_term();
let req = AppendEntriesRequest::heartbeat(term + 1, 2, 0, 0, 0);
let resp = node.handle_append_entries(req);
assert!(resp.success);
assert_eq!(node.state(), NodeState::Follower);
assert_eq!(node.leader_id(), Some(2));
assert_eq!(node.current_term(), term + 1);
}
#[test]
fn test_auto_snapshot_below_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..3 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(3).expect("ok");
log.set_applied_index(3).expect("ok");
}
let policy = SnapshotPolicy::new(5);
let created = node
.auto_snapshot_if_needed(&policy, || Ok(b"state".to_vec()))
.expect("ok");
assert!(!created);
}
#[test]
fn test_auto_snapshot_above_threshold() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("Propose should succeed");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("ok");
log.set_applied_index(6).expect("ok");
}
let policy = SnapshotPolicy::new(5);
let created = node
.auto_snapshot_if_needed(&policy, || Ok(b"state".to_vec()))
.expect("ok");
assert!(created);
}
#[test]
fn test_auto_snapshot_multiple_cycles() {
let (node, _dir) = create_test_node_with_snapshots(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
let policy = SnapshotPolicy::new(5);
for i in 0..6 {
node.propose(Command::from_str(&format!("a{}", i)))
.expect("ok");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("ok");
log.set_applied_index(6).expect("ok");
}
let created = node
.auto_snapshot_if_needed(&policy, || Ok(b"state1".to_vec()))
.expect("ok");
assert!(created);
for i in 0..6 {
node.propose(Command::from_str(&format!("b{}", i)))
.expect("ok");
}
{
let mut log = node.log.write();
log.set_commit_index(12).expect("ok");
log.set_applied_index(12).expect("ok");
}
let created = node
.auto_snapshot_if_needed(&policy, || Ok(b"state2".to_vec()))
.expect("ok");
assert!(created);
}
#[traced_test]
#[test]
fn test_state_transitions_are_traced() {
let node = create_test_node(1);
let _requests = node.start_election();
assert_eq!(node.state(), NodeState::Candidate);
let resp = RequestVoteResponse::granted(node.current_term());
let became_leader = node.handle_vote_response(2, resp);
assert!(became_leader);
assert_eq!(node.state(), NodeState::Leader);
let idx = node
.propose(Command::from_str("traced_cmd"))
.expect("propose ok");
assert!(idx > 0);
let higher_term_req = AppendEntriesRequest::heartbeat(3, 2, 0, 0, 0);
let ae_resp = node.handle_append_entries(higher_term_req);
assert!(ae_resp.success);
assert_eq!(node.state(), NodeState::Follower);
assert!(logs_contain("Started election"));
assert!(logs_contain("Won election with quorum"));
assert!(logs_contain("Became leader"));
assert!(logs_contain("Stepped down to follower"));
}
fn wal_replay_test_dir(name: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!(
"amaters_wal_replay_{name}_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
let _ = std::fs::remove_dir_all(&dir);
dir
}
#[test]
fn test_wal_replay_on_startup_basic() {
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("basic");
let wal_dir = base.join("wal");
let persist_dir = base.join("persist");
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("create writer");
for i in 1..=5 {
let entry = LogEntry::new(1, i, Command::from_str(&format!("cmd-{i}")));
writer.append(&entry).expect("append");
}
drop(writer);
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.persistence_dir = Some(persist_dir.clone());
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 5);
{
let log = node.log.read();
let entry = log.get(3).expect("entry at index 3");
assert_eq!(entry.term, 1);
assert_eq!(entry.command.data, b"cmd-3");
}
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_merges_with_persistence() {
use crate::persistence::FilePersistence;
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("merge");
let wal_dir = base.join("wal");
let persist_dir = base.join("persist");
let fp = FilePersistence::new(&persist_dir, true).expect("create persistence");
let persist_entries: Vec<LogEntry> = (1..=3)
.map(|i| LogEntry::new(1, i, Command::from_str(&format!("cmd-{i}"))))
.collect();
fp.append_entries(&persist_entries)
.expect("persist entries");
fp.save_state(1, None).expect("save state");
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("create writer");
for i in 1..=5 {
let entry = LogEntry::new(1, i, Command::from_str(&format!("cmd-{i}")));
writer.append(&entry).expect("append");
}
drop(writer);
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.persistence_dir = Some(persist_dir.clone());
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 5);
{
let log = node.log.read();
let e5 = log.get(5).expect("entry 5");
assert_eq!(e5.command.data, b"cmd-5");
}
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_with_applied_index_recovery() {
use crate::persistence::FilePersistence;
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("applied");
let wal_dir = base.join("wal");
let persist_dir = base.join("persist");
let fp = FilePersistence::new(&persist_dir, true).expect("create persistence");
let entries: Vec<LogEntry> = (1..=5)
.map(|i| LogEntry::new(1, i, Command::from_str(&format!("cmd-{i}"))))
.collect();
fp.append_entries(&entries).expect("persist");
fp.save_state(1, None).expect("save state");
fp.save_applied_index(3).expect("save applied");
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
for i in 1..=7 {
writer
.append(&LogEntry::new(1, i, Command::from_str(&format!("wal-{i}"))))
.expect("append");
}
drop(writer);
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.persistence_dir = Some(persist_dir.clone());
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 7);
{
let log = node.log.read();
assert_eq!(log.applied_index(), 3);
assert_eq!(log.commit_index(), 3);
}
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_crash_recovery_partial_entry() {
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("crash_partial");
let wal_dir = base.join("wal");
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
for i in 1..=5 {
writer
.append(&LogEntry::new(1, i, Command::from_str(&format!("cmd-{i}"))))
.expect("append");
}
drop(writer);
let seg_files: Vec<_> = std::fs::read_dir(&wal_dir)
.expect("readdir")
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().ends_with(".seg"))
.collect();
assert!(!seg_files.is_empty());
let last_seg = &seg_files[seg_files.len() - 1];
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(last_seg.path())
.expect("open seg");
use std::io::Write;
f.write_all(&[0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02])
.expect("write garbage");
drop(f);
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 5);
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_empty_wal_dir() {
let base = wal_replay_test_dir("empty_wal");
let wal_dir = base.join("wal");
std::fs::create_dir_all(&wal_dir).expect("create wal dir");
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 0);
assert_eq!(node.commit_index(), 0);
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_with_persistence_backend() {
use crate::persistence::FilePersistence;
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("with_persist");
let wal_dir = base.join("wal");
let persist_dir = base.join("persist");
let fp = FilePersistence::new(&persist_dir, true).expect("create fp");
let entries: Vec<LogEntry> = (1..=3)
.map(|i| LogEntry::new(2, i, Command::from_str(&format!("p-{i}"))))
.collect();
fp.append_entries(&entries).expect("persist");
fp.save_state(2, Some(1)).expect("save state");
fp.save_applied_index(2).expect("save applied");
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
for i in 1..=6 {
writer
.append(&LogEntry::new(2, i, Command::from_str(&format!("w-{i}"))))
.expect("append");
}
drop(writer);
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let persistence: std::sync::Arc<dyn RaftPersistence> = std::sync::Arc::new(fp);
let node = RaftNode::with_persistence(config, persistence).expect("create node");
assert_eq!(node.last_log_index(), 6);
assert_eq!(node.current_term(), 2);
{
let log = node.log.read();
assert_eq!(log.applied_index(), 2);
assert_eq!(log.commit_index(), 2);
}
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_single_op() {
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("b1_single_op");
let wal_dir = base.join("wal");
{
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
writer
.append(&LogEntry::new(1, 1, Command::from_str("single-op")))
.expect("append");
}
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), 1);
{
let log = node.log.read();
let entry = log.get(1).expect("entry at index 1");
assert_eq!(entry.command.data, b"single-op");
}
assert!(!node.is_recovering());
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_multi_op_restart() {
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("b1_multi_op");
let wal_dir = base.join("wal");
const N: u64 = 10;
{
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
for i in 1..=N {
writer
.append(&LogEntry::new(1, i, Command::from_str(&format!("op-{i}"))))
.expect("append");
}
}
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let node = RaftNode::new(config).expect("create node");
assert_eq!(node.last_log_index(), N);
{
let log = node.log.read();
for i in 1..=N {
let entry = log.get(i).expect("entry exists");
assert_eq!(entry.command.data, format!("op-{i}").as_bytes());
}
}
assert!(!node.is_recovering());
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_wal_replay_ignored_after_snapshot() {
use crate::persistence::FilePersistence;
use crate::wal::{SyncMode, WalWriter};
let base = wal_replay_test_dir("b1_ignored_after_snap");
let wal_dir = base.join("wal");
let persist_dir = base.join("persist");
let fp = FilePersistence::new(&persist_dir, true).expect("create fp");
let entries: Vec<LogEntry> = (1..=5)
.map(|i| LogEntry::new(1, i, Command::from_str(&format!("snap-{i}"))))
.collect();
fp.append_entries(&entries).expect("persist");
fp.save_state(1, None).expect("save state");
fp.save_applied_index(5).expect("save applied");
{
let mut writer =
WalWriter::new(&wal_dir, SyncMode::EveryWrite, 64 * 1024 * 1024).expect("writer");
for i in 1..=5 {
writer
.append(&LogEntry::new(1, i, Command::from_str(&format!("wal-{i}"))))
.expect("append");
}
}
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.wal_dir = Some(wal_dir.clone());
let persistence: std::sync::Arc<dyn RaftPersistence> = std::sync::Arc::new(fp);
let node = RaftNode::with_persistence(config, persistence).expect("create node");
assert_eq!(node.last_log_index(), 5);
{
let log = node.log.read();
assert_eq!(log.applied_index(), 5);
}
assert!(!node.is_recovering());
let _ = std::fs::remove_dir_all(&base);
}
#[test]
fn test_fencing_token_new() {
let token = FencingToken::new(5, 0);
assert_eq!(token.term(), 5);
assert_eq!(token.seq(), 0);
assert_eq!(token.raw(), (5u64 << 32));
}
#[test]
fn test_fencing_token_bump_seq_increments_sequence() {
let token = FencingToken::new(5, 0);
let t1 = token.bump_seq();
assert_eq!(t1.seq(), 1);
assert_eq!(t1.term(), 5);
let t2 = t1.bump_seq();
assert_eq!(t2.seq(), 2);
}
#[test]
fn test_fencing_token_new_leader_term_resets_seq() {
let token = FencingToken::new_leader_term(3);
assert_eq!(token.term(), 3);
assert_eq!(token.seq(), 0);
}
#[test]
fn test_fencing_packed_representation_roundtrip() {
let original_term: u32 = 42;
let original_seq: u32 = 1337;
let token = FencingToken::new(original_term, original_seq);
assert_eq!(token.term(), original_term);
assert_eq!(token.seq(), original_seq);
let raw = token.raw();
let reconstructed = FencingToken(raw);
assert_eq!(reconstructed.term(), original_term);
assert_eq!(reconstructed.seq(), original_seq);
}
#[test]
fn test_fencing_token_state_issues_monotonic_tokens() {
use crate::state::FencingTokenState;
let state = FencingTokenState::new();
state.bump_term_token(5);
let t0 = state.issue_token();
let t1 = state.issue_token();
let t2 = state.issue_token();
assert_eq!(t0.term(), 5);
assert_eq!(t1.term(), 5);
assert_eq!(t2.term(), 5);
assert!(t1.seq() > t0.seq());
assert!(t2.seq() > t1.seq());
}
#[test]
fn test_fencing_token_leader_issues_tokens() {
let node = create_test_node(1);
assert!(node.issue_fencing_token().is_none());
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
assert_eq!(node.state(), NodeState::Leader);
let t0 = node.issue_fencing_token().expect("should issue token");
assert_eq!(t0.term(), 1);
let t1 = node.issue_fencing_token().expect("should issue token");
assert!(t1.seq() > t0.seq());
}
#[test]
fn test_fencing_token_validate_current_term() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
let token = node.issue_fencing_token().expect("should issue token");
assert!(node.validate_fencing_token(&token).is_ok());
}
#[test]
fn test_fencing_rejects_old_term() {
let node = create_test_node(1);
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
let old_token = node.issue_fencing_token().expect("should issue token");
assert_eq!(old_token.term(), 1);
let higher_term = AppendEntriesRequest::heartbeat(5, 2, 0, 0, 0);
node.handle_append_entries(higher_term);
assert_eq!(node.state(), NodeState::Follower);
assert_eq!(node.current_term(), 5);
let result = node.validate_fencing_token(&old_token);
assert!(result.is_err());
match result {
Err(RaftError::StaleTerm { current, received }) => {
assert_eq!(current, 5);
assert_eq!(received, 1);
}
other => panic!("Expected StaleTerm, got {:?}", other),
}
}
#[test]
fn test_fencing_accepts_current_term() {
let node = create_test_node(1);
node.start_election();
node.handle_vote_response(2, RequestVoteResponse::granted(1));
let token = node.issue_fencing_token().expect("should issue token");
assert!(node.validate_fencing_token(&token).is_ok());
}
#[test]
fn test_fencing_monotonic_across_leadership_change() {
let node = create_test_node(1);
node.start_election();
node.handle_vote_response(2, RequestVoteResponse::granted(1));
let token_term1 = node.issue_fencing_token().expect("should issue token");
assert_eq!(token_term1.term(), 1);
let higher_term = AppendEntriesRequest::heartbeat(5, 2, 0, 0, 0);
node.handle_append_entries(higher_term);
assert_eq!(node.state(), NodeState::Follower);
node.start_election();
assert_eq!(node.current_term(), 6);
node.handle_vote_response(2, RequestVoteResponse::granted(6));
assert_eq!(node.state(), NodeState::Leader);
let token_term6 = node.issue_fencing_token().expect("should issue token");
assert_eq!(token_term6.term(), 6);
assert_eq!(token_term6.seq(), 0);
assert!(node.validate_fencing_token(&token_term1).is_err());
assert!(node.validate_fencing_token(&token_term6).is_ok());
assert!(token_term6 > token_term1);
}
#[test]
fn test_fencing_token_cleared_on_step_down() {
let node = create_test_node(1);
node.start_election();
node.handle_vote_response(2, RequestVoteResponse::granted(1));
assert!(node.issue_fencing_token().is_some());
let higher_term = AppendEntriesRequest::heartbeat(5, 2, 0, 0, 0);
node.handle_append_entries(higher_term);
assert_eq!(node.state(), NodeState::Follower);
assert!(node.issue_fencing_token().is_none());
}
#[test]
fn test_fencing_token_in_append_entries_request() {
let token = FencingToken::new(5, 1);
let req = AppendEntriesRequest::with_fencing_token(5, 1, 0, 0, Vec::new(), 0, token);
assert_eq!(req.fencing_token, Some(token));
}
#[test]
fn test_fencing_token_in_append_entries_response() {
let token = FencingToken::new(5, 1);
let resp = AppendEntriesResponse::success_with_token(5, 10, token);
assert_eq!(resp.fencing_token, Some(token));
assert!(resp.success);
}
#[test]
fn test_fencing_token_default_none_in_messages() {
let req = AppendEntriesRequest::new(5, 1, 0, 0, Vec::new(), 0);
assert!(req.fencing_token.is_none());
let resp = AppendEntriesResponse::success(5, 10);
assert!(resp.fencing_token.is_none());
let resp2 = AppendEntriesResponse::rejected(5);
assert!(resp2.fencing_token.is_none());
}