use std::path::PathBuf;
use std::time::Duration;
use crate::ApplyResult;
use d_engine_core::AppendResponseWithUpdates;
use d_engine_core::MockElectionCore;
use d_engine_core::MockMembership;
use d_engine_core::MockRaftLog;
use d_engine_core::MockReplicationCore;
use d_engine_core::MockTypeConfig;
use d_engine_core::RaftNodeConfig;
use d_engine_core::RoleEvent;
use d_engine_core::convert::safe_kv_bytes;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::ReadConsistencyPolicy;
use d_engine_proto::client::WatchMembershipRequest;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::raft_client_service_server::RaftClientService;
use d_engine_proto::common::LogId;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time;
use tonic::Code;
use tonic::Request;
use tracing_test::traced_test;
use crate::test_utils::MockBuilder;
use crate::test_utils::mock_node;
#[tokio::test]
#[traced_test]
async fn test_handle_service_timeout() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let node = mock_node("/tmp/test_handle_service_timeout", graceful_rx, None);
assert!(
node.request_vote(Request::new(VoteRequest {
term: 1,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
}))
.await
.is_err()
);
assert!(
node.append_entries(Request::new(AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit_index: 1
}))
.await
.is_err()
);
assert!(
node.update_cluster_conf(Request::new(ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None
}))
.await
.is_err()
);
assert!(
node.handle_client_write(Request::new(ClientWriteRequest {
command: Some(WriteCommand::default()),
client_id: 1,
}))
.await
.is_err()
);
assert!(node.get_cluster_metadata(Request::new(MetadataRequest {})).await.is_err());
assert!(
node.handle_client_read(Request::new(ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::LinearizableRead as i32),
keys: vec![]
}))
.await
.is_err()
);
}
#[tokio::test]
#[traced_test]
async fn test_server_is_not_ready() {
let (_graceful_tx, graceful_rx) = watch::channel(());
let node = mock_node("/tmp/test_server_is_not_ready", graceful_rx, None);
node.set_rpc_ready(false);
let result = node
.request_vote(Request::new(VoteRequest {
term: 1,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
}))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
let result = node
.append_entries(Request::new(AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit_index: 1,
}))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
let result = node
.update_cluster_conf(Request::new(ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None,
}))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
let result = node
.handle_client_write(Request::new(ClientWriteRequest {
client_id: 1,
command: Some(WriteCommand::default()),
}))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
let result = node.get_cluster_metadata(Request::new(MetadataRequest {})).await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
let result = node
.handle_client_read(Request::new(ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::LinearizableRead as i32),
keys: vec![],
}))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
}
#[tokio::test]
#[traced_test]
async fn test_handle_rpc_services_successfully() {
tokio::time::pause();
let mut settings = RaftNodeConfig::new()
.expect("Should succeed to init RaftNodeConfig.")
.validate()
.expect("Validate RaftNodeConfig successfully");
settings.raft.general_raft_timeout_duration_in_ms = 200;
settings.raft.batching.max_batch_size = 1;
settings.cluster.db_root_dir = PathBuf::from(
"/tmp/
test_handle_rpc_services_successfully",
);
let mut membership = MockMembership::<MockTypeConfig>::new();
membership.expect_voters().returning(Vec::new);
membership.expect_members().returning(Vec::new);
membership.expect_replication_peers().returning(Vec::new);
membership.expect_get_peers_id_with_condition().returning(|_| vec![]);
membership
.expect_update_cluster_conf_from_leader()
.returning(|_, _, _, _, _| Ok(ClusterConfUpdateResponse::success(1, 1, 1)));
membership.expect_get_cluster_conf_version().returning(|| 1);
membership
.expect_retrieve_cluster_membership_config()
.returning(|_current_leader_id| ClusterMembership {
version: 1,
nodes: vec![],
current_leader_id: None,
});
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
let log_index = Arc::new(AtomicU64::new(0));
let li_last = log_index.clone();
let li_flush = log_index.clone();
let li_prepare = log_index.clone();
let mut raft_log = MockRaftLog::new();
raft_log
.expect_last_entry_id()
.returning(move || li_last.load(Ordering::Relaxed));
raft_log.expect_flush().returning(|| Ok(()));
raft_log.expect_calculate_majority_matched_index().returning(|_, _, _| None);
raft_log.expect_load_hard_state().returning(|| Ok(None));
raft_log.expect_save_hard_state().returning(|_| Ok(()));
raft_log.expect_last_log_id().returning(|| None);
let mut replication_handler = MockReplicationCore::<MockTypeConfig>::new();
replication_handler
.expect_check_append_entries_request_is_legal()
.returning(|my_term, _, _| AppendEntriesResponse::success(1, my_term, None));
replication_handler.expect_handle_append_entries().returning(move |_, _, _| {
Ok(AppendResponseWithUpdates {
response: AppendEntriesResponse::success(1, 1, Some(LogId { term: 1, index: 1 })),
commit_index_update: Some(1),
})
});
replication_handler
.expect_prepare_batch_requests()
.returning(move |payloads, _, _, _, _| {
li_prepare.fetch_add(payloads.len() as u64, Ordering::Relaxed);
Ok(d_engine_core::PrepareResult::default())
});
let mut election_handler = MockElectionCore::<MockTypeConfig>::new();
election_handler
.expect_broadcast_vote_requests()
.returning(|_, _, _, _, _| Ok(()));
election_handler
.expect_check_vote_request_is_legal()
.returning(|_, _, _, _, _| false);
let (_graceful_tx, graceful_rx) = watch::channel(());
let (role_tx, role_rx) = mpsc::unbounded_channel::<RoleEvent>();
let test_role_tx = role_tx.clone();
let mut builder = MockBuilder::new(graceful_rx);
builder.role_tx = Some(role_tx);
builder.role_rx = Some(role_rx);
let node = builder
.with_raft_log(raft_log)
.with_membership(membership)
.with_replication_handler(replication_handler)
.with_election_handler(election_handler)
.with_node_config(settings)
.build_node();
node.set_rpc_ready(true);
let raft_lock = node.raft_core.clone();
let raft_handle = tokio::spawn(async move {
let mut raft = raft_lock.lock().await;
let _ = time::timeout(Duration::from_millis(100), raft.run()).await;
});
tokio::time::advance(Duration::from_millis(2)).await;
tokio::time::sleep(Duration::from_millis(2)).await;
let service_handler = tokio::spawn(async move {
assert!(
node.request_vote(Request::new(VoteRequest {
term: 1,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
}))
.await
.is_ok()
);
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::spawn(async move {
for _ in 0..10 {
tokio::task::yield_now().await;
}
let flush_idx = li_flush.load(Ordering::Relaxed);
let _ = test_role_tx.send(RoleEvent::LogFlushed {
durable_index: flush_idx,
});
for _ in 0..5 {
tokio::task::yield_now().await;
}
let _ = test_role_tx.send(RoleEvent::ApplyCompleted {
last_index: 2,
results: vec![ApplyResult {
index: 2,
succeeded: true,
}],
});
});
assert!(
node.handle_client_write(Request::new(ClientWriteRequest {
client_id: 1,
command: Some(WriteCommand::delete(safe_kv_bytes(1))),
}))
.await
.is_ok()
);
assert!(node.get_cluster_metadata(Request::new(MetadataRequest {})).await.is_ok());
assert!(
node.handle_client_read(Request::new(ClientReadRequest {
client_id: 1,
consistency_policy: Some(ReadConsistencyPolicy::EventualConsistency as i32),
keys: vec![],
}))
.await
.is_ok()
);
assert!(
node.append_entries(Request::new(AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit_index: 1,
}))
.await
.is_ok()
);
assert!(
node.update_cluster_conf(Request::new(ClusterConfChangeRequest {
id: 1,
term: 1,
version: 1,
change: None
}))
.await
.is_ok()
);
});
tokio::time::sleep(Duration::from_millis(100)).await;
let (_, service_response) = tokio::join!(raft_handle, service_handler,);
println!("{service_response:?}",);
assert!(service_response.is_ok());
}
#[tokio::test]
#[traced_test]
async fn test_watch_membership_returns_unavailable_when_node_not_ready() {
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let node = mock_node("/tmp/test_watch_membership_not_ready", shutdown_rx, None);
node.set_rpc_ready(false);
let result = node
.watch_membership(Request::new(WatchMembershipRequest { client_id: 1 }))
.await;
assert!(result.is_err());
assert_eq!(result.err().unwrap().code(), Code::Unavailable);
}
#[tokio::test]
#[traced_test]
async fn test_watch_membership_yields_current_snapshot_then_sentinel_on_sender_drop() {
use tokio_stream::StreamExt;
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let node = mock_node(
"/tmp/test_watch_membership_snapshot_then_sentinel",
shutdown_rx,
None,
);
node.set_rpc_ready(true);
let response = node
.watch_membership(Request::new(WatchMembershipRequest { client_id: 1 }))
.await
.expect("watch_membership should succeed when node is ready");
let mut stream = response.into_inner();
let first = stream.next().await.expect("stream must yield at least one item");
assert!(first.is_ok(), "first item must be Ok(snapshot)");
let snap = first.unwrap();
assert!(snap.members.is_empty(), "default snapshot has no voters");
assert!(snap.learners.is_empty(), "default snapshot has no learners");
assert_eq!(snap.committed_index, 0, "default committed_index is 0");
let second = stream.next().await.expect("stream must yield sentinel");
assert!(second.is_err(), "sentinel must be Err(UNAVAILABLE)");
assert_eq!(second.unwrap_err().code(), Code::Unavailable);
assert!(stream.next().await.is_none());
}
#[tokio::test]
#[traced_test]
async fn test_handle_client_scan_not_ready_returns_unavailable() {
use d_engine_proto::client::ScanRequest;
let (_graceful_tx, graceful_rx) = watch::channel(());
let node = mock_node("/tmp/test_handle_client_scan_not_ready", graceful_rx, None);
node.set_rpc_ready(false);
let result = node
.handle_client_scan(Request::new(ScanRequest {
client_id: 1,
prefix: bytes::Bytes::from("/services/"),
}))
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code(), Code::Unavailable);
}
#[tokio::test]
#[traced_test]
async fn test_handle_client_scan_timeout() {
use d_engine_proto::client::ScanRequest;
let (_graceful_tx, graceful_rx) = watch::channel(());
let node = mock_node("/tmp/test_handle_client_scan_timeout", graceful_rx, None);
node.set_rpc_ready(true);
let result = node
.handle_client_scan(Request::new(ScanRequest {
client_id: 1,
prefix: bytes::Bytes::from("/services/"),
}))
.await;
assert!(result.is_err(), "scan must error when Raft actor is absent");
}