use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::ReadConsistencyPolicy;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::common::LogId;
use d_engine_proto::common::NodeRole;
use d_engine_proto::error::ErrorCode;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::JoinRequest;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_conf_update_response;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VoteResponse;
use d_engine_proto::server::election::VotedFor;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use std::sync::Arc;
use tonic::Code;
use tonic::Status;
use crate::AppendResponseWithUpdates;
use crate::ClientCmd;
use crate::Error;
use crate::HardState;
use crate::MaybeCloneOneshot;
use crate::MaybeCloneOneshotSender;
use crate::MockElectionCore;
use crate::MockMembership;
use crate::MockReplicationCore;
use crate::MockStateMachineHandler;
use crate::NetworkError;
use crate::NewCommitData;
use crate::RaftEvent;
use crate::RaftLog;
use crate::RaftOneshot;
use crate::RoleEvent;
use crate::StateUpdate;
use crate::SystemError;
use crate::raft_role::follower_state::FollowerState;
use crate::raft_role::role_state::RaftRoleState;
use crate::test_utils::mock::MockBuilder;
use crate::test_utils::mock::MockTypeConfig;
use crate::test_utils::mock::mock_raft_context;
use crate::test_utils::mock::mock_raft_context_with_temp;
use crate::test_utils::node_config;
use mockall::predicate::eq;
use tokio::sync::{mpsc, watch};
fn create_vote_request_event(
term: u64,
candidate_id: u32,
resp_tx: MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
) -> RaftEvent {
RaftEvent::ReceiveVoteRequest(
VoteRequest {
term,
candidate_id,
last_log_index: 0,
last_log_term: 0,
},
resp_tx,
)
}
#[tokio::test]
async fn test_follower_drain_read_buffer_returns_error() {
let mut state = FollowerState::<MockTypeConfig>::new(
1,
Arc::new(node_config("/tmp/test_follower_drain")),
None,
None,
);
let result = state.drain_read_buffer();
assert!(
result.is_err(),
"Follower drain_read_buffer should return error"
);
if let Err(e) = result {
let error_str = format!("{e:?}");
assert!(
error_str.contains("NotLeader"),
"Error should be NotLeader, got: {error_str}"
);
}
}
#[tokio::test]
async fn test_new_initializes_fresh_state_correctly() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let node_id = 1;
let hard_state_from_db = None;
let last_applied_index_option = None;
let state = FollowerState::<MockTypeConfig>::new(
node_id,
context.node_config.clone(),
hard_state_from_db,
last_applied_index_option,
);
assert_eq!(
state.commit_index(),
0,
"Fresh start should have commit_index=0"
);
assert_eq!(state.current_term(), 1, "Fresh start should have term=1");
assert_eq!(
state.voted_for().unwrap(),
None,
"Fresh start should not have voted"
);
assert_eq!(
state.next_index(state.node_id()),
None,
"Follower doesn't track next_index"
);
assert_eq!(
state.match_index(state.node_id()),
None,
"Follower doesn't track match_index"
);
assert!(state.noop_log_id().is_err(), "Only Leader has noop_log_id");
}
#[tokio::test]
async fn test_new_restores_persisted_state_on_restart() {
let voted_for = VotedFor {
voted_for_id: 3,
voted_for_term: 2,
committed: false,
};
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let node_id = 1;
let hard_state_from_db = Some(HardState {
current_term: 2,
voted_for: Some(VotedFor {
voted_for_id: 3,
voted_for_term: 2,
committed: false,
}),
});
let last_applied_index_option = Some(2);
let state = FollowerState::<MockTypeConfig>::new(
node_id,
context.node_config.clone(),
hard_state_from_db,
last_applied_index_option,
);
assert_eq!(
state.commit_index(),
2,
"Should restore commit_index from last_applied"
);
assert_eq!(
state.current_term(),
2,
"Should restore term from hard_state"
);
assert_eq!(
state.voted_for().unwrap(),
Some(voted_for),
"Should restore voted_for"
);
assert!(
state.noop_log_id().is_err(),
"Follower doesn't have noop_log_id"
);
}
#[tokio::test]
async fn test_handle_vote_request_rejects_when_handler_returns_none() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler.expect_handle_vote_request().times(1).returning(|_, _, _, _| {
Ok(StateUpdate {
new_voted_for: None,
term_update: None,
})
});
context.handlers.election_handler = election_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let term_before = state.current_term();
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let raft_event = create_vote_request_event(1, 1, resp_tx);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let r = resp_rx.recv().await.unwrap().unwrap();
assert!(!r.vote_granted, "Should reject vote");
assert!(role_rx.try_recv().is_err(), "No role change");
assert_eq!(term_before, state.current_term(), "Term unchanged");
}
#[tokio::test]
async fn test_handle_vote_request_grants_and_updates_term() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let updated_term = 100;
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler
.expect_handle_vote_request()
.times(1)
.returning(move |_, _, _, _| {
Ok(StateUpdate {
new_voted_for: Some(VotedFor {
voted_for_id: 1,
voted_for_term: 1,
committed: false,
}),
term_update: Some(updated_term),
})
});
context.handlers.election_handler = election_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let raft_event = create_vote_request_event(1, 1, resp_tx);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let r = resp_rx.recv().await.unwrap().unwrap();
assert!(r.vote_granted, "Should grant vote");
assert!(role_rx.try_recv().is_err(), "Should remain Follower");
assert_eq!(state.current_term(), updated_term, "Term should update");
}
#[tokio::test]
async fn test_handle_vote_request_returns_error_on_handler_failure() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler.expect_handle_vote_request().times(1).returning(|_, _, _, _| {
Err(Error::System(SystemError::Network(
NetworkError::SingalSendFailed("".to_string()),
)))
});
context.handlers.election_handler = election_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let term_before = state.current_term();
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = create_vote_request_event(1, 1, resp_tx);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_err());
let r = resp_rx.recv().await.unwrap().unwrap();
assert!(!r.vote_granted, "Should reject on error");
assert_eq!(state.current_term(), term_before, "Term unchanged");
}
#[tokio::test]
async fn test_handle_cluster_conf_metadata_request() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership.expect_retrieve_cluster_membership_config().times(1).returning(
|_current_leader_id| ClusterMembership {
version: 1,
nodes: vec![],
current_leader_id: None,
},
);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConf(MetadataRequest {}, resp_tx);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let m = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(m.nodes, vec![]);
}
#[tokio::test]
async fn test_handle_cluster_conf_update_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 1,
version: 1,
success: true,
error_code: cluster_conf_update_response::ErrorCode::Unspecified.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 2,
term: 1,
version: 1,
change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(response.success);
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::Unspecified as i32
);
}
#[tokio::test(start_paused = true)]
async fn test_tick_triggers_election_on_timeout() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
let (event_tx, _event_rx) = mpsc::channel(1);
let election_timeout_max = context.node_config.raft.election.election_timeout_max;
tokio::time::advance(tokio::time::Duration::from_millis(election_timeout_max + 1)).await;
assert!(
state.tick(&role_tx, &event_tx, &context).await.is_ok(),
"Tick should succeed"
);
let r = role_rx.recv().await.unwrap();
assert!(
matches!(r, RoleEvent::BecomeCandidate),
"Should send BecomeCandidate event on timeout"
);
}
#[tokio::test]
async fn test_handle_append_entries_success_from_new_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let follower_term = 1;
let new_leader_term = follower_term + 1;
let expect_new_commit = 2;
let mut replication_handler = MockReplicationCore::new();
replication_handler.expect_handle_append_entries().returning(move |_, _, _| {
Ok(AppendResponseWithUpdates {
response: AppendEntriesResponse::success(
1,
new_leader_term,
Some(LogId {
term: new_leader_term,
index: 1,
}),
),
commit_index_update: Some(expect_new_commit),
})
});
let membership = MockMembership::new();
context.membership = Arc::new(membership);
context.handlers.replication_handler = replication_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.update_current_term(follower_term);
let append_entries_request = AppendEntriesRequest {
term: new_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: 0,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"handle_raft_event should succeed"
);
assert!(
matches!(
role_rx.try_recv().unwrap(),
RoleEvent::LeaderDiscovered(5, _)
),
"Should send LeaderDiscovered event"
);
assert!(
matches!(
role_rx.try_recv().unwrap(),
RoleEvent::NotifyNewCommitIndex(NewCommitData {
new_commit_index: _,
role: _,
current_term: _
})
),
"Should send NotifyNewCommitIndex event"
);
assert_eq!(
state.current_term(),
new_leader_term,
"Should update term to leader's term"
);
assert_eq!(
state.commit_index(),
expect_new_commit,
"Should update commit_index"
);
let response = resp_rx.recv().await.expect("should receive response").unwrap();
assert!(response.is_success(), "Response should indicate success");
}
#[tokio::test]
async fn test_handle_append_entries_rejects_stale_term() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let follower_term = 2;
let stale_leader_term = follower_term - 1;
let mut replication_handler = MockReplicationCore::new();
replication_handler
.expect_check_append_entries_request_is_legal()
.returning(move |_, _, _| AppendEntriesResponse::success(1, follower_term, None));
let membership = MockMembership::new();
context.membership = Arc::new(membership);
context.handlers.replication_handler = replication_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.update_current_term(follower_term);
let append_entries_request = AppendEntriesRequest {
term: stale_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: 0,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_ok(),
"handle_raft_event should succeed"
);
assert!(role_rx.try_recv().is_err(), "Should not send any events");
assert_eq!(
state.current_term(),
follower_term,
"Term should remain unchanged"
);
let response = resp_rx.recv().await.expect("should receive response").unwrap();
assert!(
response.is_higher_term(),
"Response should indicate higher term"
);
}
#[tokio::test]
async fn test_handle_append_entries_with_handler_error() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let follower_term = 1;
let new_leader_term = follower_term + 1;
let mut replication_handler = MockReplicationCore::new();
replication_handler
.expect_handle_append_entries()
.returning(|_, _, _| Err(Error::Fatal("test error".to_string())));
let membership = MockMembership::new();
context.membership = Arc::new(membership);
context.handlers.replication_handler = replication_handler;
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.update_current_term(follower_term);
let append_entries_request = AppendEntriesRequest {
term: new_leader_term,
leader_id: 5,
prev_log_index: 0,
prev_log_term: 1,
entries: vec![],
leader_commit_index: 0,
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::AppendEntries(append_entries_request, resp_tx);
let (role_tx, mut role_rx) = mpsc::unbounded_channel();
assert!(
state.handle_raft_event(raft_event, &context, role_tx).await.is_err(),
"handle_raft_event should return error"
);
assert!(
matches!(
role_rx.try_recv().unwrap(),
RoleEvent::LeaderDiscovered(5, _)
),
"Should send LeaderDiscovered event even on error"
);
assert!(
role_rx.try_recv().is_err(),
"No other events should be sent"
);
assert_eq!(
state.current_term(),
new_leader_term,
"Should update term even on error"
);
let response = resp_rx.recv().await.expect("should receive response").unwrap();
assert!(!response.is_success(), "Response should indicate failure");
}
#[tokio::test]
async fn test_handle_cluster_conf_update_rejects_non_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 1,
version: 1,
success: false,
error_code: cluster_conf_update_response::ErrorCode::NotLeader.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 3, term: 1,
version: 1,
change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.success, "Should reject non-leader request");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::NotLeader as i32
);
}
#[tokio::test]
async fn test_handle_cluster_conf_update_detects_version_conflict() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 1,
version: 5, success: false,
error_code: cluster_conf_update_response::ErrorCode::VersionConflict.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 5);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 2,
term: 1,
version: 4, change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.success, "Should reject stale version");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::VersionConflict as i32
);
assert_eq!(response.version, 5, "Should return current version");
}
#[tokio::test]
async fn test_handle_cluster_conf_update_rejects_stale_term() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 5, version: 1,
success: false,
error_code: cluster_conf_update_response::ErrorCode::TermOutdated.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.update_current_term(5);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 2,
term: 4, version: 1,
change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.success, "Should reject stale term");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::TermOutdated as i32
);
assert_eq!(response.term, 5, "Should return current term");
}
#[tokio::test]
async fn test_handle_cluster_conf_update_handles_internal_error() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Err(Error::Consensus(crate::ConsensusError::Membership(
crate::MembershipError::ConfigChangeUpdateFailed("test error".to_string()),
)))
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 2,
term: 1,
version: 1,
change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.success, "Should fail on internal error");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::InternalError as i32
);
}
#[tokio::test]
async fn test_handle_cluster_conf_update_when_leader_unknown() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut membership = MockMembership::new();
membership
.expect_update_cluster_conf_from_leader()
.times(1)
.returning(|_, _, _, _, _| {
Ok(ClusterConfUpdateResponse {
id: 1,
term: 1,
version: 1,
success: false,
error_code: cluster_conf_update_response::ErrorCode::NotLeader.into(),
})
});
membership.expect_get_cluster_conf_version().returning(|| 1);
context.membership = Arc::new(membership);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::ClusterConfUpdate(
ClusterConfChangeRequest {
id: 3,
term: 1,
version: 1,
change: None,
},
resp_tx,
);
assert!(state.handle_raft_event(raft_event, &context, role_tx).await.is_ok());
let response = resp_rx.recv().await.unwrap().unwrap();
assert!(!response.success, "Should reject when leader unknown");
assert_eq!(
response.error_code,
cluster_conf_update_response::ErrorCode::NotLeader as i32
);
}
#[tokio::test]
async fn test_handle_client_write_request_redirects_to_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Propose(
ClientWriteRequest {
client_id: 1,
command: Some(WriteCommand::default()),
},
resp_tx,
);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(result.is_err(), "Should return NOT_LEADER error");
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[tokio::test]
async fn test_handle_client_read_request_linearizable_redirects_to_leader() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::LinearizableRead as i32),
keys: vec![],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(result.is_err(), "Should return NOT_LEADER error");
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[tokio::test]
async fn test_handle_client_read_request_eventual_consistency_succeeds() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (mut context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state_machine_handler = MockStateMachineHandler::<MockTypeConfig>::new();
state_machine_handler
.expect_read_from_state_machine()
.times(1)
.returning(|_| Some(vec![]));
context.handlers.state_machine_handler = Arc::new(state_machine_handler);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::EventualConsistency as i32),
keys: vec![],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let response = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(
response.error,
ErrorCode::Success as i32,
"EventualConsistency read should succeed on follower"
);
}
#[tokio::test]
async fn test_handle_join_cluster_rejects_on_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let request = JoinRequest {
status: d_engine_proto::common::NodeStatus::Promotable as i32,
node_id: 2,
node_role: NodeRole::Learner.into(),
address: "127.0.0.1:9090".to_string(),
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::JoinCluster(request, resp_tx);
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let result = state.handle_raft_event(raft_event, &context, role_tx).await;
assert!(
result.is_err(),
"Follower should reject JoinCluster request"
);
let response = resp_rx.recv().await.expect("Should receive response");
assert!(response.is_err(), "Response should be error");
let status = response.unwrap_err();
assert_eq!(
status.code(),
Code::PermissionDenied,
"Should return PermissionDenied"
);
}
#[tokio::test]
async fn test_handle_leader_discovery_rejects_on_follower() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let request = LeaderDiscoveryRequest {
node_id: 2,
requester_address: "127.0.0.1:9090".to_string(),
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let raft_event = RaftEvent::DiscoverLeader(request, resp_tx);
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let result = state.handle_raft_event(raft_event, &context, role_tx).await;
assert!(
result.is_ok(),
"handle_raft_event should return Ok for DiscoverLeader"
);
let response = resp_rx.recv().await.expect("Should receive response");
assert!(response.is_err(), "Response should be error");
let status = response.unwrap_err();
assert_eq!(
status.code(),
Code::PermissionDenied,
"Should return PermissionDenied"
);
}
#[test]
fn test_can_purge_logs_validates_safe_range() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.shared_state.commit_index = 100;
assert!(
state.can_purge_logs(
Some(LogId { index: 90, term: 1 }), LogId { index: 99, term: 1 } ),
"Should allow purge when last_included < commit_index"
);
assert!(
state.can_purge_logs(
Some(LogId { index: 90, term: 1 }),
LogId { index: 99, term: 1 }
),
"Should allow purge up to commit_index - 1"
);
assert!(
!state.can_purge_logs(
Some(LogId { index: 90, term: 1 }),
LogId {
index: 100,
term: 1
}
),
"Should reject purge at commit_index (violates gap)"
);
}
#[test]
fn test_can_purge_logs_rejects_uncommitted_index() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.shared_state.commit_index = 50;
assert!(
!state.can_purge_logs(
Some(LogId { index: 40, term: 1 }),
LogId { index: 51, term: 1 } ),
"Should reject purge beyond commit_index"
);
assert!(
!state.can_purge_logs(
Some(LogId { index: 40, term: 1 }),
LogId { index: 50, term: 1 }
),
"Should reject purge at commit_index"
);
}
#[test]
fn test_can_purge_logs_ensures_monotonicity() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.shared_state.commit_index = 200;
assert!(
state.can_purge_logs(
Some(LogId {
index: 100,
term: 1
}),
LogId {
index: 150,
term: 1
}
),
"Should allow monotonic purge advance"
);
assert!(
!state.can_purge_logs(
Some(LogId {
index: 150,
term: 1
}),
LogId {
index: 120,
term: 1
}
),
"Should reject backwards purge"
);
assert!(
!state.can_purge_logs(
Some(LogId {
index: 150,
term: 1
}),
LogId {
index: 150,
term: 1
}
),
"Should reject same index purge"
);
}
#[test]
fn test_can_purge_logs_handles_initial_state() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.shared_state.commit_index = 100;
assert!(
state.can_purge_logs(
None, LogId { index: 99, term: 1 }
),
"Should allow first purge with valid index"
);
assert!(
!state.can_purge_logs(
None,
LogId {
index: 100,
term: 1
} ),
"First purge must still respect gap rule"
);
}
mod role_violation_tests {
use super::*;
use crate::ConsensusError;
#[tokio::test]
async fn test_follower_rejects_leader_only_events() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let raft_event = RaftEvent::LogPurgeCompleted(LogId { term: 1, index: 1 });
let e = state.handle_raft_event(raft_event, &context, role_tx).await.unwrap_err();
assert!(
matches!(e, Error::Consensus(ConsensusError::RoleViolation { .. })),
"LogPurgeCompleted should return RoleViolation"
);
}
#[tokio::test]
async fn test_follower_ignores_duplicate_create_snapshot_event() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let result1 = state
.handle_raft_event(RaftEvent::CreateSnapshotEvent, &context, role_tx.clone())
.await;
assert!(result1.is_ok(), "First CreateSnapshotEvent should succeed");
assert!(
state.snapshot_in_progress.load(std::sync::atomic::Ordering::SeqCst),
"snapshot_in_progress should be true after first event"
);
let result2 =
state.handle_raft_event(RaftEvent::CreateSnapshotEvent, &context, role_tx).await;
assert!(
result2.is_ok(),
"Second CreateSnapshotEvent should return Ok (ignored)"
);
assert!(
state.snapshot_in_progress.load(std::sync::atomic::Ordering::SeqCst),
"snapshot_in_progress should remain true"
);
}
#[tokio::test]
async fn test_follower_resets_snapshot_flag_on_success() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.snapshot_in_progress.store(true, std::sync::atomic::Ordering::SeqCst);
let metadata = d_engine_proto::server::storage::SnapshotMetadata {
last_included: Some(LogId { term: 1, index: 50 }),
checksum: bytes::Bytes::new(),
};
let snapshot_result = Ok((metadata, std::path::PathBuf::from("/tmp/test_snapshot.bin")));
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let result = state
.handle_raft_event(
RaftEvent::SnapshotCreated(snapshot_result),
&context,
role_tx,
)
.await;
assert!(result.is_ok(), "SnapshotCreated should succeed");
assert!(
!state.snapshot_in_progress.load(std::sync::atomic::Ordering::SeqCst),
"snapshot_in_progress should be false after SnapshotCreated"
);
}
#[tokio::test]
async fn test_follower_resets_snapshot_flag_on_failure() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
state.snapshot_in_progress.store(true, std::sync::atomic::Ordering::SeqCst);
let snapshot_result = Err(Error::Fatal("Snapshot creation failed".to_string()));
let (role_tx, _role_rx) = mpsc::unbounded_channel();
let result = state
.handle_raft_event(
RaftEvent::SnapshotCreated(snapshot_result),
&context,
role_tx,
)
.await;
assert!(
result.is_ok(),
"SnapshotCreated with error should return Ok"
);
assert!(
!state.snapshot_in_progress.load(std::sync::atomic::Ordering::SeqCst),
"snapshot_in_progress should be false after failed SnapshotCreated"
);
}
}
mod handle_client_read_request {
use super::*;
use crate::RaftNodeConfig;
use crate::config::ReadConsistencyPolicy as ServerPolicy;
use crate::convert::safe_kv_bytes;
use d_engine_proto::client::ReadConsistencyPolicy as ClientPolicy;
#[tokio::test]
async fn test_follower_rejects_lease_read_policy() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut node_config = RaftNodeConfig::default();
node_config.raft.read_consistency.allow_client_override = true;
let context = MockBuilder::new(graceful_rx).with_node_config(node_config).build_context();
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ClientPolicy::LeaseRead as i32),
keys: vec![safe_kv_bytes(1)],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(result.is_err(), "LeaseRead should be rejected by follower");
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[tokio::test]
async fn test_follower_applies_server_default_policy() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut node_config = RaftNodeConfig::default();
node_config.raft.read_consistency.default_policy = ServerPolicy::LinearizableRead;
let context = MockBuilder::new(graceful_rx).with_node_config(node_config).build_context();
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: None, keys: vec![safe_kv_bytes(1)],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(
result.is_err(),
"Default LinearizableRead should be rejected by follower"
);
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
#[tokio::test]
async fn test_follower_serves_eventual_consistency_reads() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut node_config = RaftNodeConfig::default();
node_config.raft.read_consistency.default_policy = ServerPolicy::EventualConsistency;
let context = MockBuilder::new(graceful_rx).with_node_config(node_config).build_context();
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: None, keys: vec![safe_kv_bytes(1)],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let response = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(
response.error,
ErrorCode::Success as i32,
"EventualConsistency read should succeed on follower"
);
}
#[tokio::test]
async fn test_follower_client_override_disabled_falls_back_to_server_eventual() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut node_config = RaftNodeConfig::default();
node_config.raft.read_consistency.default_policy = ServerPolicy::EventualConsistency;
node_config.raft.read_consistency.allow_client_override = false;
let context = MockBuilder::new(graceful_rx).with_node_config(node_config).build_context();
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ClientPolicy::LinearizableRead as i32),
keys: vec![safe_kv_bytes(1)],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let response = resp_rx.recv().await.unwrap().unwrap();
assert_eq!(
response.error,
ErrorCode::Success as i32,
"Follower should serve read using server default EventualConsistency, ignoring client LinearizableRead"
);
}
#[tokio::test]
async fn test_follower_client_override_disabled_falls_back_to_server_linear_rejects() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut node_config = RaftNodeConfig::default();
node_config.raft.read_consistency.default_policy = ServerPolicy::LinearizableRead;
node_config.raft.read_consistency.allow_client_override = false;
let context = MockBuilder::new(graceful_rx).with_node_config(node_config).build_context();
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
let client_read_request = ClientReadRequest {
client_id: 1,
consistency_policy: Some(ClientPolicy::EventualConsistency as i32),
keys: vec![safe_kv_bytes(1)],
};
let (resp_tx, mut resp_rx) = MaybeCloneOneshot::new();
let cmd = ClientCmd::Read(client_read_request, resp_tx);
state.push_client_cmd(cmd, &context);
let result = resp_rx.recv().await.expect("channel should not be closed");
assert!(
result.is_err(),
"Follower must reject LinearizableRead (requires leader)"
);
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::FailedPrecondition);
assert!(err.message().contains("Not leader"));
}
}
#[tokio::test]
async fn test_follower_handles_fatal_error_returns_error() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let context = mock_raft_context(
"/tmp/test_follower_handles_fatal_error_returns_error",
graceful_rx,
None,
);
let hard_state = context.storage.raft_log.load_hard_state().expect("Failed to load hard state");
let mut follower =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), hard_state, Some(0));
let fatal_error = RaftEvent::FatalError {
source: "StateMachine".to_string(),
error: "Disk failure".to_string(),
};
let (role_tx, mut role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let result = follower.handle_raft_event(fatal_error, &context, role_tx).await;
assert!(
result.is_err(),
"Expected handle_raft_event to return Err, got: {result:?}"
);
match result.unwrap_err() {
Error::Fatal(msg) => {
assert!(
msg.contains("StateMachine"),
"Error message should mention source, got: {msg}"
);
}
other => panic!("Expected Error::Fatal, got: {other:?}"),
}
assert!(
role_rx.try_recv().is_err(),
"No role transition events should be sent during FatalError handling"
);
}
#[tokio::test]
async fn test_apply_completed_triggers_snapshot_when_condition_met() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut mock_sm_handler = crate::MockStateMachineHandler::new();
mock_sm_handler
.expect_should_snapshot()
.with(eq(NewCommitData {
new_commit_index: 100,
role: NodeRole::Follower as i32,
current_term: 1,
}))
.times(1)
.returning(|_| true);
let context = MockBuilder::new(graceful_rx)
.with_state_machine_handler(mock_sm_handler)
.build_context();
let hard_state = context.storage.raft_log.load_hard_state().expect("Failed to load hard state");
let mut follower =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), hard_state, Some(0));
let (role_tx, mut role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let apply_completed_event = RaftEvent::ApplyCompleted {
last_index: 100,
results: vec![],
};
let result = follower.handle_raft_event(apply_completed_event, &context, role_tx).await;
assert!(
result.is_ok(),
"ApplyCompleted should be handled successfully, got: {result:?}"
);
let event = role_rx.try_recv().expect("Should receive snapshot event");
match event {
RoleEvent::ReprocessEvent(boxed_event) => {
match *boxed_event {
RaftEvent::CreateSnapshotEvent => {
}
other => panic!("Expected CreateSnapshotEvent, got: {other:?}"),
}
}
other => panic!("Expected RoleEvent::ReprocessEvent, got: {other:?}"),
}
assert!(
role_rx.try_recv().is_err(),
"Should only send one snapshot event"
);
}
#[tokio::test]
async fn test_apply_completed_does_not_trigger_snapshot_when_condition_not_met() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mut mock_sm_handler = crate::MockStateMachineHandler::new();
mock_sm_handler
.expect_should_snapshot()
.with(eq(NewCommitData {
new_commit_index: 50,
role: NodeRole::Follower as i32,
current_term: 1,
}))
.times(1)
.returning(|_| false);
let context = MockBuilder::new(graceful_rx)
.with_state_machine_handler(mock_sm_handler)
.build_context();
let hard_state = context.storage.raft_log.load_hard_state().expect("Failed to load hard state");
let mut follower =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), hard_state, Some(0));
let (role_tx, mut role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let apply_completed_event = RaftEvent::ApplyCompleted {
last_index: 50,
results: vec![],
};
let result = follower.handle_raft_event(apply_completed_event, &context, role_tx).await;
assert!(
result.is_ok(),
"ApplyCompleted should be handled successfully"
);
assert!(
role_rx.try_recv().is_err(),
"Should not send snapshot event when condition is not met"
);
}
#[tokio::test]
async fn test_apply_completed_respects_snapshot_disabled_config() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let mock_sm_handler = crate::MockStateMachineHandler::new();
let mut node_config = node_config("/tmp/test_follower_snapshot_disabled");
node_config.raft.snapshot.enable = false;
let context = MockBuilder::new(graceful_rx)
.with_state_machine_handler(mock_sm_handler)
.with_node_config(node_config)
.build_context();
let hard_state = context.storage.raft_log.load_hard_state().expect("Failed to load hard state");
let mut follower =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), hard_state, Some(0));
let (role_tx, mut role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let apply_completed_event = RaftEvent::ApplyCompleted {
last_index: 100,
results: vec![],
};
let result = follower.handle_raft_event(apply_completed_event, &context, role_tx).await;
assert!(
result.is_ok(),
"ApplyCompleted should be handled successfully"
);
assert!(
role_rx.try_recv().is_err(),
"Should not send snapshot event when snapshot is disabled in config"
);
}
#[tokio::test]
async fn test_follower_rejects_strong_consistency_reads() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let (context, _temp_dir) = mock_raft_context_with_temp(graceful_rx, None);
let mut state =
FollowerState::<MockTypeConfig>::new(1, context.node_config.clone(), None, None);
{
let (response_tx, mut response_rx) = MaybeCloneOneshot::new();
let read_req = ClientReadRequest {
client_id: 1,
keys: vec![bytes::Bytes::from("lease_key")],
consistency_policy: Some(ReadConsistencyPolicy::LeaseRead as i32),
};
let start = tokio::time::Instant::now();
state.push_client_cmd(ClientCmd::Read(read_req, response_tx), &context);
let result = response_rx.recv().await;
let elapsed = start.elapsed();
assert!(result.is_ok(), "Should receive response from Follower");
assert!(
elapsed.as_millis() < 10,
"Lease read rejection should be immediate, took {:?}ms",
elapsed.as_millis()
);
if let Ok(Err(err)) = result {
let err_str = format!("{err:?}");
assert!(
err_str.contains("Not leader")
|| err_str.contains("NotLeader")
|| err_str.contains("NOT_LEADER")
|| err_str.contains("FailedPrecondition"),
"Expected NOT_LEADER error for Lease read, got: {err:?}"
);
} else {
panic!("Lease read to Follower should return NOT_LEADER error, got: {result:?}");
}
}
{
let (response_tx, mut response_rx) = MaybeCloneOneshot::new();
let read_req = ClientReadRequest {
client_id: 1,
keys: vec![bytes::Bytes::from("linear_key")],
consistency_policy: Some(ReadConsistencyPolicy::LinearizableRead as i32),
};
let start = tokio::time::Instant::now();
state.push_client_cmd(ClientCmd::Read(read_req, response_tx), &context);
let result = response_rx.recv().await;
let elapsed = start.elapsed();
assert!(result.is_ok(), "Should receive response from Follower");
assert!(
elapsed.as_millis() < 10,
"Linear read rejection should be immediate, took {:?}ms",
elapsed.as_millis()
);
if let Ok(Err(err)) = result {
let err_str = format!("{err:?}");
assert!(
err_str.contains("Not leader")
|| err_str.contains("NotLeader")
|| err_str.contains("NOT_LEADER")
|| err_str.contains("FailedPrecondition"),
"Expected NOT_LEADER error for Linear read, got: {err:?}"
);
} else {
panic!("Linear read to Follower should return NOT_LEADER error, got: {result:?}");
}
}
}