use std::sync::Arc;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::ReadConsistencyPolicy;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::common::LogId;
use d_engine_proto::error::ErrorCode;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_conf_update_response;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VotedFor;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use tonic::Code;
use crate::ClientCmd;
use crate::ConsensusError;
use crate::ElectionError;
use crate::Error;
use crate::MaybeCloneOneshot;
use crate::MockBuilder;
use crate::MockElectionCore;
use crate::MockMembership;
use crate::MockReplicationCore;
use crate::MockStateMachineHandler;
use crate::RaftEvent;
use crate::RaftOneshot;
use crate::RoleEvent;
use crate::raft_role::candidate_state::CandidateState;
use crate::raft_role::role_state::RaftRoleState;
use crate::test_utils::create_test_chunk;
use crate::test_utils::create_test_snapshot_stream;
use crate::test_utils::mock::MockTypeConfig;
use crate::test_utils::mock::mock_election_core;
use crate::test_utils::mock::mock_raft_context;
use crate::test_utils::node_config;
use tokio::sync::{mpsc, watch};
#[tokio::test]
async fn test_can_vote_myself_returns_true_for_new_candidate() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let mut node_config = crate::RaftNodeConfig::new().expect("Should create default config");
node_config.cluster.db_root_dir = temp_dir.path().to_path_buf();
let node_config = node_config.validate().expect("Should validate config");
let state = CandidateState::<MockTypeConfig>::new(1, Arc::new(node_config));
assert!(
state.can_vote_myself(),
"New candidate should be able to vote for itself"
);
}
#[tokio::test]
async fn test_can_vote_myself_returns_false_after_voting() {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let mut node_config = crate::RaftNodeConfig::new().expect("Should create default config");
node_config.cluster.db_root_dir = temp_dir.path().to_path_buf();
let node_config = node_config.validate().expect("Should validate config");
let mut state = CandidateState::<MockTypeConfig>::new(1, Arc::new(node_config));
let voted_for = VotedFor {
voted_for_id: state.node_id(),
voted_for_term: state.current_term(),
committed: false,
};
state.update_voted_for(voted_for).expect("Should succeed to update voted_for");
assert!(
!state.can_vote_myself(),
"Candidate should not be able to vote again after already voting"
);
}
#[tokio::test]
async fn test_candidate_drain_read_buffer_returns_error() {
let mut state = CandidateState::<MockTypeConfig>::new(
1,
Arc::new(node_config("/tmp/test_candidate_drain")),
);
let result = state.drain_read_buffer();
assert!(
result.is_err(),
"Candidate drain_read_buffer should return error"
);
if let Err(e) = result {
let error_str = format!("{e:?}");
assert!(
error_str.contains("NotLeader"),
"Error should be NotLeader, got: {error_str}"
);
}
}
#[tokio::test(start_paused = true)]
async fn test_tick_triggers_new_election_round_on_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_tick_success", graceful_rx, None);
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler
.expect_broadcast_vote_requests()
.times(1)
.returning(|_, _, _, _, _| Ok(()));
context.handlers.election_handler = election_handler;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let (event_tx, _event_rx) = mpsc::channel(1);
let election_timeout_max = context.node_config.raft.election.election_timeout_max;
tokio::time::advance(tokio::time::Duration::from_millis(election_timeout_max + 1)).await;
assert!(
state.tick(&role_tx, &event_tx, &context).await.is_ok(),
"Tick should succeed"
);
assert_eq!(state.current_term(), 2, "Term should increment to 2");
assert_eq!(
state.voted_for().unwrap(),
Some(VotedFor {
voted_for_id: 1,
voted_for_term: 2,
committed: false,
}),
"Should vote for itself in new term"
);
}
#[tokio::test(start_paused = true)]
async fn test_tick_discovers_higher_term_and_steps_down() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_tick_higher_term", graceful_rx, None);
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler
.expect_broadcast_vote_requests()
.times(1)
.returning(|_, _, _, _, _| {
Err(Error::Consensus(ConsensusError::Election(
ElectionError::HigherTerm(100),
)))
});
context.handlers.election_handler = election_handler;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let (event_tx, _event_rx) = mpsc::channel(1);
let election_timeout_max = context.node_config.raft.election.election_timeout_max;
tokio::time::advance(tokio::time::Duration::from_millis(election_timeout_max + 1)).await;
assert!(
state.tick(&role_tx, &event_tx, &context).await.is_ok(),
"Tick should succeed even with HigherTerm error"
);
assert_eq!(state.current_term(), 100, "Term should update to 100");
assert!(
matches!(role_rx.try_recv().unwrap(), RoleEvent::BecomeFollower(_)),
"Should send BecomeFollower event"
);
}
#[tokio::test]
async fn test_handle_vote_request_rejects_illegal_request() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_vote_reject", graceful_rx, None);
let mut election_core = mock_election_core();
election_core
.expect_check_vote_request_is_legal()
.returning(|_, _, _, _, _| false);
context.handlers.election_handler = election_core;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let term_before = state.current_term();
let request_term = term_before;
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ReceiveVoteRequest(
VoteRequest {
term: request_term,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
},
resp_tx,
);
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"Should handle event successfully"
);
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.vote_granted, "Should reject vote");
assert!(
role_rx.try_recv().is_err(),
"Should not send role change event"
);
assert_eq!(state.current_term(), term_before, "Term should not change");
}
#[tokio::test]
async fn test_handle_vote_request_grants_and_steps_down_when_legal() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_vote_grant", graceful_rx, None);
let mut election_core = mock_election_core();
election_core
.expect_check_vote_request_is_legal()
.returning(|_, _, _, _, _| true);
context.handlers.election_handler = election_core;
let updated_term = 100;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ReceiveVoteRequest(
VoteRequest {
term: updated_term,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
},
resp_tx,
);
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"Should handle event successfully"
);
assert!(
matches!(role_rx.try_recv(), Ok(RoleEvent::BecomeFollower(None))),
"Should send BecomeFollower event"
);
assert!(
matches!(role_rx.try_recv().unwrap(), RoleEvent::ReprocessEvent(_)),
"Should send ReprocessEvent for Follower to handle"
);
assert_eq!(
state.current_term(),
updated_term,
"Term should update to 100"
);
assert!(
resp_rx.recv().await.is_err(),
"Should not send response, let Follower handle it"
);
}
#[tokio::test]
async fn test_handle_cluster_conf_metadata_request() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_cluster_conf", graceful_rx, None);
let mut membership = MockMembership::new();
membership.expect_retrieve_cluster_membership_config().times(1).returning(
|_current_leader_id| ClusterMembership {
version: 1,
nodes: vec![],
current_leader_id: None,
},
);
context.membership = Arc::new(membership);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConf(MetadataRequest {}, resp_tx);
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"Should handle ClusterConf event"
);
let membership_response = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(
membership_response.nodes,
vec![],
"Should return cluster nodes"
);
}
#[tokio::test]
async fn test_handle_cluster_conf_update_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_conf_update", graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 1,
version: 1,
success: true,
error_code: cluster_conf_update_response::ErrorCode::Unspecified.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 2, term: 1,
version: 1,
change: None,
},
resp_tx,
);
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"Should handle ClusterConfUpdate event"
);
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(response.success, "Update should succeed");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::Unspecified as i32,
"Should have no error"
);
}
#[tokio::test]
async fn test_handle_append_entries_steps_down_to_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_append_entries_step_down", graceful_rx, None);
let term = 1;
let new_leader_term = term;
let new_leader_commit = 5;
let mut replication_handler = MockReplicationCore::new();
replication_handler
.expect_check_append_entries_request_is_legal()
.returning(move |_, _, _| AppendEntriesResponse::success(1, term, None));
let membership = MockMembership::new();
context.membership = Arc::new(membership);
context.handlers.replication_handler = replication_handler;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
state.update_current_term(term);
let append_entries_request = AppendEntriesRequest {
term: new_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: new_leader_commit,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
assert!(matches!(
role_rx.try_recv(),
Ok(RoleEvent::BecomeFollower(None))
));
assert!(matches!(
role_rx.try_recv().unwrap(),
RoleEvent::ReprocessEvent(_)
));
assert_eq!(state.current_term(), new_leader_term);
assert!(state.commit_index() != new_leader_commit);
assert!(resp_rx.recv().await.is_err());
}
#[tokio::test]
async fn test_handle_append_entries_rejects_lower_term() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_append_reject_lower", graceful_rx, None);
let term = 2;
let new_leader_term = term - 1;
let mut replication_handler = MockReplicationCore::new();
replication_handler
.expect_check_append_entries_request_is_legal()
.returning(move |_, _, _| AppendEntriesResponse::higher_term(1, term));
context.membership = Arc::new(MockMembership::new());
context.handlers.replication_handler = replication_handler;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
state.update_current_term(term);
let append_entries_request = AppendEntriesRequest {
term: new_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: 0,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
assert!(role_rx.try_recv().is_err());
assert_eq!(state.current_term(), term);
let response = resp_rx.recv().await.expect("should succeed").unwrap();
assert!(response.is_higher_term());
}
#[tokio::test]
async fn test_handle_append_entries_conflict() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_append_conflict", graceful_rx, None);
let term = 2;
let new_leader_term = term - 1;
let mut replication_handler = MockReplicationCore::new();
replication_handler
.expect_check_append_entries_request_is_legal()
.returning(move |_, _, _| AppendEntriesResponse::conflict(1, term, None, None));
context.membership = Arc::new(MockMembership::new());
context.handlers.replication_handler = replication_handler;
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
state.update_current_term(term);
let append_entries_request = AppendEntriesRequest {
term: new_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: 0,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
assert!(role_rx.try_recv().is_err());
assert_eq!(state.current_term(), term);
let response = resp_rx.recv().await.expect("should succeed").unwrap();
assert!(response.is_conflict());
}
#[tokio::test]
async fn test_handle_client_write_returns_not_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_client_write", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Propose(
ClientWriteRequest {
client_id: 1,
command: Some(WriteCommand::default()),
},
resp_tx,
);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[test]
fn test_send_become_follower_and_replay_events() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_replay_event", graceful_rx, None);
let state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let (resp_tx, _resp_rx) = MaybeCloneOneshot::new();
assert!(state.send_become_follower_event(&role_tx).is_ok());
assert!(
state
.send_replay_raft_event(
&role_tx,
RaftEvent::ReceiveVoteRequest(
VoteRequest {
term: 1,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
},
resp_tx,
),
)
.is_ok()
);
assert!(matches!(
role_rx.try_recv().unwrap(),
RoleEvent::BecomeFollower(None)
));
assert!(matches!(
role_rx.try_recv().unwrap(),
RoleEvent::ReprocessEvent(_)
));
}
#[tokio::test]
async fn test_handle_install_snapshot_returns_permission_denied() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_install_snapshot", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let stream = create_test_snapshot_stream(vec![create_test_chunk(0, b"chunk0", 1, 1, 2)]);
let raft_event = RaftEvent::InstallSnapshotChunk(Box::new(stream), resp_tx);
let result = state.handle_raft_event(raft_event, &context, mpsc::unbounded_channel().0).await;
assert!(result.is_err(), "Expected error");
let response = resp_rx.recv().await.expect("Response should be received");
assert!(response.is_err(), "Expected error response");
let status = response.unwrap_err();
assert_eq!(status.code(), Code::PermissionDenied);
assert_eq!(status.message(), "Not Follower or Learner.");
}
#[tokio::test]
async fn test_handle_discover_leader_returns_permission_denied() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_discover_leader", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let request = LeaderDiscoveryRequest {
node_id: 2,
requester_address: "127.0.0.1:9090".to_string(),
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::DiscoverLeader(request, resp_tx);
let result = state.handle_raft_event(raft_event, &context, mpsc::unbounded_channel().0).await;
assert!(result.is_ok(), "Expected Ok");
let response = resp_rx.recv().await.expect("Response should be received");
assert!(response.is_err(), "Expected error response");
let status = response.unwrap_err();
assert_eq!(status.code(), Code::PermissionDenied);
}
#[cfg(test)]
mod role_violation_tests {
use super::*;
#[tokio::test]
async fn test_candidate_role_violation_errors() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_role_violation", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::CreateSnapshotEvent;
let e = state.handle_raft_event(raft_event, &context, role_tx).await.unwrap_err();
assert!(matches!(
e,
Error::Consensus(ConsensusError::RoleViolation { .. })
));
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::SnapshotCreated(Err(Error::Fatal("test".to_string())));
let e = state.handle_raft_event(raft_event, &context, role_tx).await.unwrap_err();
assert!(matches!(
e,
Error::Consensus(ConsensusError::RoleViolation { .. })
));
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::LogPurgeCompleted(LogId { term: 1, index: 1 });
let e = state.handle_raft_event(raft_event, &context, role_tx).await.unwrap_err();
assert!(matches!(
e,
Error::Consensus(ConsensusError::RoleViolation { .. })
));
}
#[tokio::test]
async fn test_candidate_does_not_create_snapshot_on_apply_completed() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_candidate_no_snapshot", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let apply_completed_event = RaftEvent::ApplyCompleted {
last_index: 1000,
results: vec![],
};
let result =
state.handle_raft_event(apply_completed_event, &context, role_tx.clone()).await;
assert!(
result.is_ok(),
"Candidate should handle ApplyCompleted without error"
);
match role_rx.try_recv() {
Err(mpsc::error::TryRecvError::Empty) => {
}
Err(mpsc::error::TryRecvError::Disconnected) => {
panic!("role event channel disconnected unexpectedly");
}
Ok(event) => {
panic!(
"Candidate should not send any events on ApplyCompleted, but received: {event:?}"
);
}
}
drop(role_tx);
}
}
#[cfg(test)]
mod handle_client_read_request {
use super::*;
#[tokio::test]
async fn test_client_read_linearizable_returns_not_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context("/tmp/test_read_linearizable", graceful_rx, None);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::LinearizableRead as i32),
keys: vec![],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[tokio::test]
async fn test_client_read_eventual_consistency_succeeds() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut context = mock_raft_context("/tmp/test_read_eventual", graceful_rx, None);
let mut state_machine_handler = MockStateMachineHandler::<MockTypeConfig>::new();
state_machine_handler
.expect_read_from_state_machine()
.times(1)
.returning(|_| Some(vec![]));
context.handlers.state_machine_handler = Arc::new(state_machine_handler);
let mut state = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::EventualConsistency as i32),
keys: vec![],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let response = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(response.error, ErrorCode::Success as i32);
}
}
#[tokio::test]
async fn test_candidate_handles_fatal_error_returns_error() {
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let context = crate::test_utils::mock::mock_raft_context(
"/tmp/test_candidate_handles_fatal_error_returns_error",
shutdown_rx,
None,
);
let mut candidate = CandidateState::<MockTypeConfig>::new(1, context.node_config.clone());
let fatal_error = RaftEvent::FatalError {
source: "StateMachine".to_string(),
error: "Network failure - cannot persist state".to_string(),
};
let (role_tx, mut role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let result = candidate.handle_raft_event(fatal_error, &context, role_tx).await;
assert!(
result.is_err(),
"Expected handle_raft_event to return Err, got: {result:?}"
);
match result.unwrap_err() {
Error::Fatal(msg) => {
assert!(
msg.contains("StateMachine"),
"Error message should mention source, got: {msg}"
);
}
other => panic!("Expected Error::Fatal, got: {other:?}"),
}
assert!(
role_rx.try_recv().is_err(),
"No role transition events should be sent during FatalError handling"
);
}
#[tokio::test]
async fn test_new_leader_initializes_empty_buffers() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut raft = MockBuilder::new(graceful_rx).build_raft();
let mut raft_log = crate::MockRaftLog::new();
raft_log.expect_last_entry_id().returning(|| 11);
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_append_entries().returning(|_| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| Some(11));
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
let mut replication_handler = crate::MockReplicationCore::new();
replication_handler
.expect_handle_raft_request_in_batch()
.returning(|_, _, _, _, _| {
Ok(crate::AppendResults {
commit_quorum_achieved: true,
learner_progress: std::collections::HashMap::new(),
peer_updates: std::collections::HashMap::new(),
})
});
raft.ctx.storage.raft_log = Arc::new(raft_log);
raft.ctx.handlers.replication_handler = replication_handler;
raft.handle_role_event(crate::RoleEvent::BecomeCandidate)
.await
.expect("Should become Candidate");
raft.handle_role_event(crate::RoleEvent::BecomeLeader)
.await
.expect("Should become Leader");
assert!(
matches!(raft.role, crate::RaftRole::Leader(_)),
"Should be in Leader state"
);
if let crate::RaftRole::Leader(ref mut leader) = raft.role {
let (response_tx, _response_rx) = crate::MaybeCloneOneshot::new();
let write_cmd = d_engine_proto::client::WriteCommand {
operation: Some(d_engine_proto::client::write_command::Operation::Insert(
d_engine_proto::client::write_command::Insert {
key: bytes::Bytes::from("first_key"),
value: bytes::Bytes::from("first_value"),
ttl_secs: 0,
},
)),
};
let write_req = d_engine_proto::client::ClientWriteRequest {
client_id: 1,
command: Some(write_cmd),
};
let cmd = crate::ClientCmd::Propose(write_req, response_tx);
leader.push_client_cmd(cmd, &raft.ctx);
let (role_tx, _role_rx) = tokio::sync::mpsc::unbounded_channel();
let result = leader.flush_cmd_buffers(&raft.ctx, &role_tx).await;
assert!(
result.is_ok(),
"New Leader should successfully flush buffers, got: {result:?}"
);
} else {
panic!("Expected Leader state after BecomeLeader");
}
}