use std::sync::Arc;
use d_engine_proto::common::NodeRole::{Candidate, Follower, Leader, Learner};
use tokio::sync::mpsc;
use tokio::sync::watch;
use crate::NewCommitData;
use crate::RoleEvent;
use crate::test_utils::MockBuilder;
use crate::test_utils::MockTypeConfig;
fn is_follower(role_i32: i32) -> bool {
role_i32 == Follower as i32
}
fn is_candidate(role_i32: i32) -> bool {
role_i32 == Candidate as i32
}
fn is_leader(role_i32: i32) -> bool {
role_i32 == Leader as i32
}
fn is_learner(role_i32: i32) -> bool {
role_i32 == Learner as i32
}
fn prepare_succeed_majority_confirmation() -> (
crate::MockRaftLog,
crate::MockReplicationCore<MockTypeConfig>,
) {
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(move |_, _, _, _, _| {
Ok(crate::AppendResults {
commit_quorum_achieved: true,
learner_progress: std::collections::HashMap::new(),
peer_updates: std::collections::HashMap::new(),
})
});
(raft_log, replication_handler)
}
#[tokio::test]
async fn test_role_transition_follower_candidate_leader_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
assert!(raft.handle_role_event(RoleEvent::BecomeFollower(None)).await.is_err());
assert!(is_follower(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Follower should transition to Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Candidate should transition to Leader");
assert!(is_leader(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Leader should transition to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_follower_candidate_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Follower should transition to Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Candidate should step down to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_follower_candidate_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Follower should transition to Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Candidate should transition to Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_candidate_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(Some(2)))
.await
.expect("Candidate should step down to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_candidate_candidate_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeCandidate).await;
assert!(
result.is_err(),
"Candidate should not transition to Candidate"
);
assert!(is_candidate(raft.role.as_i32()), "Should remain Candidate");
}
#[tokio::test]
async fn test_role_transition_leader_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(Some(2)))
.await
.expect("Leader should step down to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_leader_candidate_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeCandidate).await;
assert!(result.is_err(), "Leader should not transition to Candidate");
assert!(is_leader(raft.role.as_i32()), "Should remain Leader");
}
#[tokio::test]
async fn test_role_transition_leader_leader_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeLeader).await;
assert!(result.is_err(), "Leader should not transition to Leader");
assert!(is_leader(raft.role.as_i32()), "Should remain Leader");
}
#[tokio::test]
async fn test_role_transition_learner_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Learner should transition to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_role_transition_learner_candidate_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeCandidate).await;
assert!(
result.is_err(),
"Learner should not transition to Candidate"
);
assert!(is_learner(raft.role.as_i32()), "Should remain Learner");
}
#[tokio::test]
async fn test_role_transition_learner_leader_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeLeader).await;
assert!(result.is_err(), "Learner should not transition to Leader");
assert!(is_learner(raft.role.as_i32()), "Should remain Learner");
}
#[tokio::test]
async fn test_role_transition_learner_learner_invalid() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
let result = raft.handle_role_event(RoleEvent::BecomeLearner).await;
assert!(result.is_err(), "Learner should not transition to Learner");
assert!(is_learner(raft.role.as_i32()), "Should remain Learner");
}
#[tokio::test]
async fn test_follower_become_with_known_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let leader_id = 3;
raft.handle_role_event(RoleEvent::BecomeFollower(Some(leader_id)))
.await
.expect_err("Initial follower should not be able to become follower again");
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeFollower(Some(leader_id)))
.await
.expect("Should transition to Follower with known leader");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_follower_become_without_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Should transition to Follower without known leader");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_follower_state_reset_voted_for() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Should transition to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_candidate_become_increments_term() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let initial_term = raft.role.current_term();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
let candidate_term = raft.role.current_term();
assert_eq!(
candidate_term, initial_term,
"Candidate term should be same as follower term after BecomeCandidate transition"
);
}
#[tokio::test]
async fn test_candidate_become_votes_for_self() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let _node_id = raft.ctx.node_id;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
}
#[tokio::test]
async fn test_candidate_become_clears_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_become_marks_vote_committed() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_become_initializes_peer_indices() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_become_initializes_metadata_cache() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_become_sends_noop_entry() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_verification_succeeds() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_verification_fails_downgrades() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(|_, _, _, _, _| Err(crate::Error::Fatal("Verification failed".to_string())));
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_handler;
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_single_node_cluster() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let mut membership = crate::MockMembership::new();
membership.expect_is_single_node_cluster().returning(|| true);
membership.expect_voters().returning(Vec::new);
membership.expect_replication_peers().returning(Vec::new);
membership.expect_get_peers_id_with_condition().returning(|_| Vec::new());
raft.ctx.membership = Arc::new(membership);
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_learner_become_non_voting() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
}
#[tokio::test]
async fn test_learner_promotion_to_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeFollower(None))
.await
.expect("Learner should be promoted to Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_multiple_learners_tracking() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
}
#[tokio::test]
async fn test_become_follower_sends_leader_notification() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (leader_tx, _leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
let leader_id = 2;
raft.handle_role_event(RoleEvent::BecomeFollower(Some(leader_id)))
.await
.expect("Should become Follower");
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_become_candidate_clears_leader_notification() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (leader_tx, _leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
assert!(is_candidate(raft.role.as_i32()));
}
#[tokio::test]
async fn test_become_leader_full_initialization() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let (leader_tx, _leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_ready_notification_only_after_noop_committed() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let (leader_tx, mut leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
leader_rx.changed().await.expect("Should receive leader ready notification");
let info = leader_rx.borrow().expect("Leader info must be Some after noop committed");
assert_eq!(info.leader_id, raft.node_id);
}
#[tokio::test]
async fn test_leader_ready_notification_suppressed_when_noop_fails() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(|_, _, _, _, _| {
Ok(crate::AppendResults {
commit_quorum_achieved: false, learner_progress: std::collections::HashMap::new(),
peer_updates: std::collections::HashMap::new(),
})
});
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_handler;
let (leader_tx, mut leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
let result =
tokio::time::timeout(tokio::time::Duration::from_millis(200), leader_rx.changed()).await;
if result.is_ok() {
assert!(
leader_rx.borrow().is_none(),
"Must not send Some(leader_id) when noop fails — no false leader ready signal"
);
}
}
#[tokio::test]
async fn test_become_learner_no_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (leader_tx, _leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeLearner)
.await
.expect("Should become Learner");
assert!(is_learner(raft.role.as_i32()));
}
#[tokio::test]
async fn test_notify_new_commit_index_broadcasts() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx2, mut rx2) = mpsc::unbounded_channel();
raft.register_new_commit_listener(tx1);
raft.register_new_commit_listener(tx2);
let new_commit_index = 10;
raft.handle_role_event(RoleEvent::NotifyNewCommitIndex(NewCommitData {
new_commit_index,
role: Follower.into(),
current_term: 1,
}))
.await
.expect("Should handle NotifyNewCommitIndex");
let msg1 = rx1.recv().await.expect("Listener 1 should receive");
let msg2 = rx2.recv().await.expect("Listener 2 should receive");
assert_eq!(msg1.new_commit_index, new_commit_index);
assert_eq!(msg2.new_commit_index, new_commit_index);
}
#[tokio::test]
async fn test_leader_discovered_no_state_change() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let initial_role = raft.role.as_i32();
raft.handle_role_event(RoleEvent::LeaderDiscovered(3, 5))
.await
.expect("Should handle LeaderDiscovered");
assert_eq!(raft.role.as_i32(), initial_role);
}
#[tokio::test]
async fn test_reprocess_event_requeues() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_single_peer() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_two_peers() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_three_peers() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_five_peers() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_match_index_zero() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_includes_learners() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_init_mixed_voters_learners() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_metadata_cache_replication_peers() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_metadata_cache_voters() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_metadata_cache_all_active() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_metadata_cache_learner_separation() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_metadata_cache_single_node() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let mut membership = crate::MockMembership::new();
membership.expect_is_single_node_cluster().returning(|| true);
membership.expect_voters().returning(Vec::new);
membership.expect_replication_peers().returning(Vec::new);
membership.expect_get_peers_id_with_condition().returning(|_| Vec::new());
raft.ctx.membership = Arc::new(membership);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leadership_verification_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leadership_verification_failure_downgrades() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(|_, _, _, _, _| Err(crate::Error::Fatal("Majority timeout".to_string())));
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_handler;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leadership_verification_single_node() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let mut membership = crate::MockMembership::new();
membership.expect_is_single_node_cluster().returning(|| true);
membership.expect_voters().returning(Vec::new);
membership.expect_replication_peers().returning(Vec::new);
membership.expect_get_peers_id_with_condition().returning(|_| Vec::new());
raft.ctx.membership = Arc::new(membership);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_quorum_exactly_majority_3node() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_quorum_5node_minority_failure() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_network_partition_minority_loses_leadership() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(|_, _, _, _, _| Err(crate::Error::Fatal("Partition".to_string())));
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_handler;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_leader_change_notification_correctness() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let (leader_tx, _leader_rx) = watch::channel(None);
raft.register_leader_change_listener(leader_tx);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_new_commit_notification_correctness() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (tx, mut rx) = mpsc::unbounded_channel();
raft.register_new_commit_listener(tx);
let new_commit_index = 25;
raft.handle_role_event(RoleEvent::NotifyNewCommitIndex(NewCommitData {
new_commit_index,
role: Follower.into(),
current_term: 5,
}))
.await
.unwrap();
let msg = rx.recv().await.expect("Should receive notification");
assert_eq!(msg.new_commit_index, new_commit_index);
}
#[tokio::test]
async fn test_role_transition_notification() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
raft.register_role_transition_listener(tx);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
let role = rx.recv().await.expect("Should receive role transition");
assert!(is_candidate(role));
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
let role = rx.recv().await.expect("Should receive role transition");
assert!(is_leader(role));
}
#[tokio::test]
async fn test_event_priority_p0_shutdown() {
let (graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
graceful_tx.send(()).expect("Should send shutdown signal");
}
#[tokio::test]
async fn test_event_priority_p1_tick() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_event_priority_p2_role_before_p3_raft() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_event_priority_p3_raft_last() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_concurrent_events_tick_role_raft() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_concurrent_events_role_multiple_raft() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_concurrent_events_shutdown_during_election() {
let (graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
graceful_tx.send(()).expect("Should send shutdown");
}
#[tokio::test]
async fn test_election_timeout_follower_to_candidate() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_election_timeout_candidate_restart() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
assert!(is_candidate(raft.role.as_i32()));
}
#[tokio::test]
async fn test_election_timeout_leader_no_action() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_election_timeout_reliable_timing() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_complete_election_flow() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
assert!(is_follower(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_invalid_transitions_return_error() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
assert!(raft.handle_role_event(RoleEvent::BecomeFollower(None)).await.is_err());
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
assert!(raft.handle_role_event(RoleEvent::BecomeCandidate).await.is_err());
raft.handle_role_event(RoleEvent::BecomeLearner).await.unwrap();
assert!(raft.handle_role_event(RoleEvent::BecomeLearner).await.is_err());
}
#[tokio::test]
async fn test_failed_transition_state_unchanged() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let initial_role = raft.role.as_i32();
let _ = raft.handle_role_event(RoleEvent::BecomeFollower(None)).await;
assert_eq!(raft.role.as_i32(), initial_role);
}
#[tokio::test]
async fn test_single_node_cluster_election() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
let mut membership = crate::MockMembership::new();
membership.expect_is_single_node_cluster().returning(|| true);
membership.expect_voters().returning(Vec::new);
membership.expect_replication_peers().returning(Vec::new);
membership.expect_get_peers_id_with_condition().returning(|_| Vec::new());
raft.ctx.membership = Arc::new(membership);
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
assert!(is_candidate(raft.role.as_i32()));
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
assert!(is_leader(raft.role.as_i32()));
}
#[tokio::test]
async fn test_multiple_listeners_concurrent() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx2, mut rx2) = mpsc::unbounded_channel();
let (tx3, mut rx3) = mpsc::unbounded_channel();
raft.register_new_commit_listener(tx1);
raft.register_new_commit_listener(tx2);
raft.register_new_commit_listener(tx3);
raft.handle_role_event(RoleEvent::NotifyNewCommitIndex(NewCommitData {
new_commit_index: 5,
role: Follower.into(),
current_term: 1,
}))
.await
.unwrap();
assert!(rx1.recv().await.is_some());
assert!(rx2.recv().await.is_some());
assert!(rx3.recv().await.is_some());
}
#[tokio::test]
async fn test_late_listener_registration() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let (raft_log, replication_core) = prepare_succeed_majority_confirmation();
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_core;
raft.handle_role_event(RoleEvent::BecomeCandidate).await.unwrap();
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
raft.register_role_transition_listener(tx);
raft.handle_role_event(RoleEvent::BecomeLeader).await.unwrap();
let role = rx.recv().await.expect("Should receive late registration");
assert!(is_leader(role));
}
#[tokio::test]
async fn test_joining_node_flag() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_snapshot_fetch_before_main_loop() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
}
#[tokio::test]
async fn test_snapshot_fetch_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_snapshot_fetch_failure_fallback() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_joining_node_starts_as_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_joining_node_receives_append_entries() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_joining_node_catchup() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_joining_node_quorum_participation() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_bootstrap_timing_snapshot_first() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_bootstrap_join_then_run() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_multiple_nodes_joining() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft1 = MockBuilder::new(graceful_rx.clone()).build_raft();
let (_graceful_tx2, graceful_rx2) = watch::channel(());
let _raft2 = MockBuilder::new(graceful_rx2).build_raft();
}
#[tokio::test]
async fn test_joining_node_empty_log() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_joining_node_no_premature_commit() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_snapshot_consistency_validation() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let _raft = MockBuilder::new(graceful_rx).build_raft();
assert!(is_follower(_raft.role.as_i32()));
}
#[tokio::test]
async fn test_graceful_shutdown_persists_hardstate() {
tokio::time::pause();
let (graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_save_hard_state().times(1).returning(|_| Ok(()));
raft.ctx.storage.raft_log = Arc::new(raft_log);
let raft_handle = tokio::spawn(async move { raft.run().await });
graceful_tx.send(()).expect("Should send shutdown signal");
tokio::time::advance(std::time::Duration::from_millis(2)).await;
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let result = raft_handle.await;
assert!(
matches!(result, Ok(Ok(()))),
"Expected clean exit, but got {result:?}"
);
}