use std::pin::Pin;
use std::sync::Arc;
use d_engine_core::client::ClientResponse as CoreClientResponse;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientResponse;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::MembershipSnapshot;
use d_engine_proto::client::ScanRequest;
use d_engine_proto::client::ScanResponse;
use d_engine_proto::client::WatchMembershipRequest;
use d_engine_proto::client::WatchRequest;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::client::raft_client_service_server::RaftClientService;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::JoinRequest;
use d_engine_proto::server::cluster::JoinResponse;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
use futures::Stream;
#[derive(Clone, Default)]
pub struct MockRpcService {
pub server_port: Option<u16>,
pub expected_update_cluster_conf_response:
Option<Result<ClusterConfUpdateResponse, tonic::Status>>,
pub expected_client_propose_response: Option<Result<CoreClientResponse, tonic::Status>>,
pub expected_client_read_response: Option<Result<CoreClientResponse, tonic::Status>>,
#[allow(clippy::type_complexity)]
pub expected_metadata_response:
Option<Arc<dyn Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync>>,
pub expected_discover_leader_response: Option<Result<LeaderDiscoveryResponse, tonic::Status>>,
pub expected_watch_events: Option<Result<Vec<WatchResponse>, tonic::Status>>,
pub expected_watch_membership_events: Option<Result<Vec<MembershipSnapshot>, tonic::Status>>,
pub expected_client_scan_response: Option<Result<ScanResponse, tonic::Status>>,
}
impl MockRpcService {
pub fn with_metadata_response(
mut self,
f: impl Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync + 'static,
) -> Self {
self.expected_metadata_response = Some(Arc::new(f));
self
}
pub fn set_port(
&mut self,
port: u16,
) {
self.server_port = Some(port);
}
}
fn core_to_proto_response(r: CoreClientResponse) -> ClientResponse {
use d_engine_core::client::{ClientResponsePayload, WriteResult};
use d_engine_proto::client::{
ClientResult, ReadResults as ProtoReadResults, WriteResult as ProtoWriteResult,
client_response::SuccessResult,
};
use d_engine_proto::error::ErrorMetadata;
let error_code = r.error as i32;
let metadata = if r.leader_hint.is_some() || r.retry_after_ms.is_some() {
Some(ErrorMetadata {
retry_after_ms: r.retry_after_ms,
leader_id: r.leader_hint.as_ref().map(|h| h.leader_id.to_string()),
leader_address: r.leader_hint.map(|h| h.address),
debug_message: None,
})
} else {
None
};
let success_result = r.result.map(|payload| match payload {
ClientResponsePayload::Write(WriteResult { succeeded }) => {
SuccessResult::WriteResult(ProtoWriteResult { succeeded })
}
ClientResponsePayload::Read(rd) => SuccessResult::ReadData(ProtoReadResults {
results: rd
.entries
.into_iter()
.map(|e| ClientResult {
key: e.key,
value: e.value,
})
.collect(),
}),
});
ClientResponse {
error: error_code,
metadata,
success_result,
}
}
#[tonic::async_trait]
impl ClusterManagementService for MockRpcService {
async fn update_cluster_conf(
&self,
_request: tonic::Request<ClusterConfChangeRequest>,
) -> std::result::Result<tonic::Response<ClusterConfUpdateResponse>, tonic::Status> {
match &self.expected_update_cluster_conf_response {
Some(Ok(response)) => Ok(tonic::Response::new(*response)),
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unknown(
"No mock update_cluster_conf response set",
)),
}
}
async fn get_cluster_metadata(
&self,
_request: tonic::Request<MetadataRequest>,
) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
match (&self.expected_metadata_response, self.server_port) {
(Some(f), Some(port)) => f(port).map(tonic::Response::new).map_err(|e| e.clone()),
_ => Err(tonic::Status::unimplemented(
"Metadata response not configured",
)),
}
}
async fn join_cluster(
&self,
_request: tonic::Request<JoinRequest>,
) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
Err(tonic::Status::unimplemented(
"join_cluster removed from client API",
))
}
async fn discover_leader(
&self,
_request: tonic::Request<LeaderDiscoveryRequest>,
) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
match &self.expected_discover_leader_response {
Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unknown(
"No mock discover_leader response set",
)),
}
}
}
#[tonic::async_trait]
impl RaftClientService for MockRpcService {
type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send>>;
type WatchMembershipStream =
Pin<Box<dyn Stream<Item = Result<MembershipSnapshot, tonic::Status>> + Send>>;
async fn handle_client_write(
&self,
_request: tonic::Request<ClientWriteRequest>,
) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
match &self.expected_client_propose_response {
Some(Ok(response)) => Ok(tonic::Response::new(core_to_proto_response(
response.clone(),
))),
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unknown(
"No mock handle_client_write response set",
)),
}
}
async fn handle_client_read(
&self,
_request: tonic::Request<ClientReadRequest>,
) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
match &self.expected_client_read_response {
Some(Ok(response)) => Ok(tonic::Response::new(core_to_proto_response(
response.clone(),
))),
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unknown(
"No mock handle_client_read response set",
)),
}
}
async fn watch(
&self,
_request: tonic::Request<WatchRequest>,
) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
match &self.expected_watch_events {
Some(Ok(events)) => {
let items: Vec<Result<WatchResponse, tonic::Status>> =
events.iter().cloned().map(Ok).collect();
let stream = futures::stream::iter(items);
Ok(tonic::Response::new(Box::pin(stream)))
}
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unimplemented("Watch not configured in mock")),
}
}
async fn watch_membership(
&self,
_request: tonic::Request<WatchMembershipRequest>,
) -> std::result::Result<tonic::Response<Self::WatchMembershipStream>, tonic::Status> {
match &self.expected_watch_membership_events {
Some(Ok(snapshots)) => {
let items: Vec<Result<MembershipSnapshot, tonic::Status>> =
snapshots.iter().cloned().map(Ok).collect();
let stream = futures::stream::iter(items);
Ok(tonic::Response::new(Box::pin(stream)))
}
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unimplemented(
"WatchMembership not configured in mock",
)),
}
}
async fn handle_client_scan(
&self,
_request: tonic::Request<ScanRequest>,
) -> std::result::Result<tonic::Response<ScanResponse>, tonic::Status> {
match &self.expected_client_scan_response {
Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
Some(Err(status)) => Err(status.clone()),
None => Err(tonic::Status::unimplemented("Scan not configured in mock")),
}
}
}