use std::collections::HashMap;
use d_engine_core::ConnectionType;
use d_engine_core::Error;
use d_engine_core::MockMembership;
use d_engine_core::MockTypeConfig;
use d_engine_core::NetworkError;
use d_engine_core::RaftNodeConfig;
use d_engine_core::RetryPolicies;
use d_engine_core::SystemError;
use d_engine_core::Transport;
use d_engine_proto::common::LogId;
use d_engine_proto::common::NodeRole::Candidate;
use d_engine_proto::common::NodeRole::Follower;
use d_engine_proto::common::NodeRole::Leader;
use d_engine_proto::common::NodeStatus;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::NodeMeta;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VoteResponse;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::storage::SnapshotChunk;
use futures::StreamExt;
use futures::stream;
use futures::stream::BoxStream;
use tokio::sync::oneshot;
use tonic::Status;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
use tracing_test::traced_test;
use super::*;
use crate::network::grpc::grpc_transport::GrpcTransport;
use crate::test_utils::MockNode;
use crate::test_utils::create_test_chunk;
use crate::test_utils::create_test_snapshot_stream;
fn node_config(db_path: &str) -> RaftNodeConfig {
let mut s = RaftNodeConfig::new().expect("RaftNodeConfig should be inited successfully");
s.cluster.db_root_dir = std::path::PathBuf::from(db_path);
s.validate().expect("RaftNodeConfig should be validated successfully")
}
fn mock_membership(
peers: Vec<(u32, i32)>, channels: HashMap<(u32, ConnectionType), Channel>,
) -> Arc<MockMembership<MockTypeConfig>> {
let mut membership = MockMembership::<MockTypeConfig>::new();
membership.expect_voters().returning(move || {
peers
.iter()
.map(|(id, role)| NodeMeta {
id: *id,
address: "127.0.0.1:50051".to_string(),
role: *role,
status: NodeStatus::Active.into(),
})
.collect()
});
membership
.expect_get_peer_channel()
.returning(move |peer_id, conn_type| channels.get(&(peer_id, conn_type)).cloned());
Arc::new(membership)
}
#[tokio::test]
#[traced_test]
async fn test_send_cluster_update_case1() {
let my_id = 1;
let mut node_config = node_config("/tmp/test_send_cluster_update_case1");
node_config.retry.membership.max_retries = 1;
let request = ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None,
};
let membership = mock_membership(vec![], HashMap::new());
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
let result = client.send_cluster_update(request, &node_config.retry, membership).await;
let err = result.unwrap_err();
assert!(matches!(
err,
Error::System(SystemError::Network(NetworkError::EmptyPeerList { .. }))
));
}
#[tokio::test]
#[traced_test]
async fn test_send_cluster_update_case2() {
let my_id = 1;
let mut node_config = node_config("/tmp/test_send_cluster_update_case2");
node_config.retry.membership.max_retries = 1;
let request = ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None,
};
let (_tx, rx) = oneshot::channel::<()>();
let response = ClusterMembership {
version: 1,
nodes: vec![],
current_leader_id: None,
};
let (channel, _port) = MockNode::simulate_mock_service_with_cluster_conf_reps(
rx,
Some(Box::new(move |_port| Ok(response.clone()))),
)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((my_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(vec![(my_id, Follower as i32)], channels);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_cluster_update(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.is_empty());
assert!(res.peer_ids.is_empty())
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_cluster_update_case3() {
let my_id = 1;
let peer1_id = 2;
let peer2_id = 3;
let mut node_config = node_config("/tmp/test_send_cluster_update_case3");
node_config.retry.membership.max_retries = 1;
let request = ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None,
};
let channel = Endpoint::from_static("http://[::]:50051").connect_lazy();
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_cluster_update(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_cluster_update_case4() {
let my_id = 1;
let peer1_id = 2;
let peer2_id = 3;
let mut node_config = node_config("/tmp/test_send_cluster_update_case4");
node_config.retry.membership.max_retries = 1;
let request = ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None,
};
let (_tx, rx) = oneshot::channel::<()>();
let (channel, _port) = MockNode::simulate_mock_service_with_cluster_conf_reps(
rx,
Some(Box::new(move |_port| {
Err(Status::unavailable("message".to_string()))
})),
)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_cluster_update(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_append_requests_case1() {
let my_id = 1;
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
let membership = mock_membership(vec![], HashMap::new());
match client
.send_append_requests(vec![], &RetryPolicies::default(), membership, false)
.await
{
Ok(_) => panic!(),
Err(e) => assert!(matches!(
e,
Error::System(SystemError::Network(NetworkError::EmptyPeerList { .. }))
)),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_append_requests_case2() {
let leader_id = 1;
let leader_current_term = 1;
let leader_commit_index = 1;
let peer_2_id = 2;
let peer_2_term = 1;
let peer_2_match_index = 1;
let response = AppendEntriesResponse::success(
peer_2_id,
peer_2_term,
Some(LogId {
term: peer_2_term,
index: peer_2_match_index,
}),
);
let (_tx, rx) = oneshot::channel::<()>();
let (channel, _port) = MockNode::simulate_append_entries_mock_server(Ok(response), rx)
.await
.expect("should succeed");
let request = AppendEntriesRequest {
term: leader_current_term,
leader_id,
prev_log_index: 1,
prev_log_term: 1,
entries: vec![],
leader_commit_index,
};
let requests_with_peer_address = vec![(leader_id, request)];
let mut channels = HashMap::new();
channels.insert((leader_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(vec![(leader_id, Leader as i32)], channels);
let node_config = node_config("/tmp/test_send_append_requests_case2");
let my_id = 1;
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client
.send_append_requests(
requests_with_peer_address,
&node_config.retry,
membership,
false,
)
.await
{
Ok(res) => {
assert!(res.responses.is_empty());
assert!(res.peer_ids.is_empty())
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_append_requests_case3_1() {
let leader_id = 1;
let leader_current_term = 1;
let leader_commit_index = 1;
let peer_2_id = 2;
let peer_3_id = 3;
let peer_2_term = leader_current_term;
let peer_2_match_index = 10;
let peer_3_term = leader_current_term;
let peer_3_match_index = 1;
let peer_2_response = AppendEntriesResponse::success(
peer_2_id,
peer_2_term,
Some(LogId {
term: peer_2_term,
index: peer_2_match_index,
}),
);
let peer_3_response = AppendEntriesResponse::conflict(
peer_3_id,
peer_3_term,
Some(peer_3_term),
Some(peer_3_match_index),
);
let (_tx2, rx2) = oneshot::channel::<()>();
let (channel2, _port2) =
MockNode::simulate_append_entries_mock_server(Ok(peer_2_response), rx2)
.await
.expect("should succeed");
let (_tx3, rx3) = oneshot::channel::<()>();
let (channel3, _port3) =
MockNode::simulate_append_entries_mock_server(Ok(peer_3_response), rx3)
.await
.expect("should succeed");
let peer_req = AppendEntriesRequest {
term: leader_current_term,
leader_id,
prev_log_index: 1,
prev_log_term: 1,
entries: vec![],
leader_commit_index,
};
let requests_with_peer_address = vec![(peer_2_id, peer_req.clone()), (peer_3_id, peer_req)];
let mut channels = HashMap::new();
channels.insert((peer_2_id, ConnectionType::Data), channel2.clone());
channels.insert((peer_3_id, ConnectionType::Data), channel3.clone());
let membership = mock_membership(vec![(leader_id, Leader as i32)], channels);
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let my_id = 1;
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client
.send_append_requests(
requests_with_peer_address,
&node_config.retry,
membership,
true,
)
.await
{
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_append_requests_case3_2() {
let leader_id = 1;
let leader_current_term = 1;
let leader_commit_index = 1;
let peer_2_id = 2;
let peer_3_id = 3;
let peer_2_term = leader_current_term;
let peer_2_match_index = 10;
let peer_2_response = AppendEntriesResponse::success(
peer_2_id,
peer_2_term,
Some(LogId {
term: peer_2_term,
index: peer_2_match_index,
}),
);
let peer_3_response = Err(Status::unavailable("Service is not ready"));
let (_tx2, rx2) = oneshot::channel::<()>();
let (channel2, _port2) =
MockNode::simulate_append_entries_mock_server(Ok(peer_2_response), rx2)
.await
.expect("should succeed");
let (_tx3, rx3) = oneshot::channel::<()>();
let (channel3, _port3) = MockNode::simulate_append_entries_mock_server(peer_3_response, rx3)
.await
.expect("should succeed");
let peer_req = AppendEntriesRequest {
term: leader_current_term,
leader_id,
prev_log_index: 1,
prev_log_term: 1,
entries: vec![],
leader_commit_index,
};
let requests_with_peer_address = vec![(peer_2_id, peer_req.clone()), (peer_3_id, peer_req)];
let mut channels = HashMap::new();
channels.insert((peer_2_id, ConnectionType::Data), channel2.clone());
channels.insert((peer_3_id, ConnectionType::Data), channel3.clone());
let membership = mock_membership(vec![(leader_id, Leader as i32)], channels);
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let my_id = 1;
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client
.send_append_requests(
requests_with_peer_address,
&node_config.retry,
membership,
true,
)
.await
{
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case1() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: 1,
};
let membership = mock_membership(vec![], HashMap::new());
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_vote_requests(request, &node_config.retry, membership).await {
Ok(_) => panic!(),
Err(e) => assert!(matches!(
e,
Error::System(SystemError::Network(NetworkError::EmptyPeerList { .. }))
)),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case2() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let (_tx1, rx1) = oneshot::channel::<()>();
let vote_response = VoteResponse {
term: 1,
vote_granted: true,
last_log_index: 0,
last_log_term: 0,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: 1,
};
let (channel, _port) = MockNode::simulate_send_votes_mock_server(vote_response, rx1)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((my_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(vec![(my_id, Follower as i32)], channels);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_vote_requests(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.is_empty());
assert!(res.peer_ids.is_empty())
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case3() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let peer1_id = 2;
let peer2_id = 3;
let (_tx1, rx1) = oneshot::channel::<()>();
let vote_response = VoteResponse {
term: 1,
vote_granted: true,
last_log_index: 0,
last_log_term: 0,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: 1,
};
let (channel, _port) = MockNode::simulate_send_votes_mock_server(vote_response, rx1)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_vote_requests(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case4_1() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let peer1_id = 2;
let peer2_id = 3;
let (_tx1, rx1) = oneshot::channel::<()>();
let vote_response = VoteResponse {
term: 1,
vote_granted: false,
last_log_index: 0,
last_log_term: 0,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: 1,
};
let (channel, _port) = MockNode::simulate_send_votes_mock_server(vote_response, rx1)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_vote_requests(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2)
}
Err(_) => panic!(),
}
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case4_2() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let peer1_id = 2;
let peer2_id = 3;
let (_tx1, rx1) = oneshot::channel::<()>();
let my_last_log_term = 3;
let vote_response = VoteResponse {
term: 1,
vote_granted: false,
last_log_index: 1,
last_log_term: my_last_log_term + 1,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: my_last_log_term,
};
let (channel, _port) = MockNode::simulate_send_votes_mock_server(vote_response, rx1)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
assert!(client.send_vote_requests(request, &node_config.retry, membership).await.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case4_3() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let peer1_id = 2;
let peer2_id = 3;
let (_tx1, rx1) = oneshot::channel::<()>();
let my_last_log_index = 1;
let my_last_log_term = 3;
let vote_response = VoteResponse {
term: 1,
vote_granted: false,
last_log_index: my_last_log_index + 1,
last_log_term: my_last_log_term,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: my_last_log_index,
last_log_term: my_last_log_term,
};
let (channel, _port) = MockNode::simulate_send_votes_mock_server(vote_response, rx1)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel.clone());
channels.insert((peer2_id, ConnectionType::Control), channel.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
assert!(client.send_vote_requests(request, &node_config.retry, membership).await.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_send_vote_requests_case5() {
let my_id = 1;
let node_config = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
let peer1_id = 2;
let peer2_id = 3;
let (_tx1, rx1) = oneshot::channel::<()>();
let (_tx2, rx2) = oneshot::channel::<()>();
let peer1_response = VoteResponse {
term: 1,
vote_granted: false,
last_log_index: 0,
last_log_term: 0,
};
let peer2_response = VoteResponse {
term: 1,
vote_granted: true,
last_log_index: 0,
last_log_term: 0,
};
let request = VoteRequest {
term: 1,
candidate_id: my_id,
last_log_index: 1,
last_log_term: 1,
};
let (channel1, _port1) = MockNode::simulate_send_votes_mock_server(peer1_response, rx1)
.await
.expect("should succeed");
let (channel2, _port2) = MockNode::simulate_send_votes_mock_server(peer2_response, rx2)
.await
.expect("should succeed");
let mut channels = HashMap::new();
channels.insert((peer1_id, ConnectionType::Control), channel1.clone());
channels.insert((peer2_id, ConnectionType::Control), channel2.clone());
let membership = mock_membership(
vec![(peer1_id, Follower as i32), (peer2_id, Candidate as i32)],
channels,
);
let client: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
match client.send_vote_requests(request, &node_config.retry, membership).await {
Ok(res) => {
assert!(res.responses.len() == 2);
assert!(res.peer_ids.len() == 2);
}
Err(_) => panic!(),
}
}
#[allow(unused)]
fn create_failing_stream(fail_at: usize) -> BoxStream<'static, Result<SnapshotChunk>> {
let mut chunks = vec![];
for i in 0..5 {
let data = vec![i as u8; 1024];
chunks.push(create_test_chunk(i as u32, &data, 1, 1, 5));
}
let stream = create_test_snapshot_stream(chunks);
Box::pin(stream::unfold(
(stream, 0),
move |(mut stream, count)| async move {
if count == fail_at {
Some((
Err(Error::Fatal("Injected failure".to_string())),
(stream, count + 1),
))
} else {
match stream.next().await {
Some(Ok(chunk)) => Some((Ok(chunk), (stream, count + 1))),
Some(Err(e)) => {
Some((Err(Error::Fatal(format!("{e:?}",))), (stream, count + 1)))
}
None => None,
}
}
},
))
}
#[tokio::test]
#[traced_test]
async fn test_grpc_transport_drop_aborts_tasks() {
let my_id = 1;
let peer_id = 2;
let channel = Endpoint::from_static("http://[::]:50051").connect_lazy();
let mut channels = HashMap::new();
channels.insert((peer_id, ConnectionType::Data), channel.clone());
let mut membership = MockMembership::<MockTypeConfig>::new();
membership.expect_voters().returning(move || {
vec![NodeMeta {
id: peer_id,
address: "127.0.0.1:50051".to_string(),
role: Follower as i32,
status: NodeStatus::Active as i32,
}]
});
let channel_clone = channel.clone();
membership
.expect_get_peer_channel()
.returning(move |_, _| Some(channel_clone.clone()));
let membership = Arc::new(membership);
let transport: GrpcTransport<MockTypeConfig> = GrpcTransport::new(my_id);
let retry = RetryPolicies::default();
let request = AppendEntriesRequest {
term: 1,
leader_id: my_id,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit_index: 0,
};
let _result = transport
.send_append_requests(vec![(peer_id, request)], &retry, membership, false)
.await;
assert!(
transport.has_active_tasks(),
"Background task should be running after send_append_requests"
);
drop(transport);
}