d-engine-client 0.2.4

Client library for interacting with d-engine Raft clusters via gRPC
Documentation
use std::pin::Pin;
use std::sync::Arc;

use d_engine_core::client::ClientResponse as CoreClientResponse;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientResponse;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::MembershipSnapshot;
use d_engine_proto::client::ScanRequest;
use d_engine_proto::client::ScanResponse;
use d_engine_proto::client::WatchMembershipRequest;
use d_engine_proto::client::WatchRequest;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::client::raft_client_service_server::RaftClientService;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::JoinRequest;
use d_engine_proto::server::cluster::JoinResponse;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
use futures::Stream;

#[derive(Clone, Default)]
pub struct MockRpcService {
    pub server_port: Option<u16>,
    // Expected responses for each method
    pub expected_update_cluster_conf_response:
        Option<Result<ClusterConfUpdateResponse, tonic::Status>>,
    pub expected_client_propose_response: Option<Result<CoreClientResponse, tonic::Status>>,
    pub expected_client_read_response: Option<Result<CoreClientResponse, tonic::Status>>,

    #[allow(clippy::type_complexity)]
    pub expected_metadata_response:
        Option<Arc<dyn Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync>>,

    pub expected_discover_leader_response: Option<Result<LeaderDiscoveryResponse, tonic::Status>>,

    /// Watch stream events to emit, in order. None means return an error.
    pub expected_watch_events: Option<Result<Vec<WatchResponse>, tonic::Status>>,

    /// Membership watch stream snapshots to emit. None means return an error.
    pub expected_watch_membership_events: Option<Result<Vec<MembershipSnapshot>, tonic::Status>>,

    /// Scan response. None means return unimplemented.
    pub expected_client_scan_response: Option<Result<ScanResponse, tonic::Status>>,
}
impl MockRpcService {
    pub fn with_metadata_response(
        mut self,
        f: impl Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync + 'static,
    ) -> Self {
        self.expected_metadata_response = Some(Arc::new(f));
        self
    }

    pub fn set_port(
        &mut self,
        port: u16,
    ) {
        self.server_port = Some(port);
    }
}

/// Convert core `ClientResponse` to the proto wire type for test mock gRPC handlers.
/// Both `ErrorCode` enums are `#[repr(i32)]` with identical discriminants.
fn core_to_proto_response(r: CoreClientResponse) -> ClientResponse {
    use d_engine_core::client::{ClientResponsePayload, WriteResult};
    use d_engine_proto::client::{
        ClientResult, ReadResults as ProtoReadResults, WriteResult as ProtoWriteResult,
        client_response::SuccessResult,
    };
    use d_engine_proto::error::ErrorMetadata;

    let error_code = r.error as i32;
    let metadata = if r.leader_hint.is_some() || r.retry_after_ms.is_some() {
        Some(ErrorMetadata {
            retry_after_ms: r.retry_after_ms,
            leader_id: r.leader_hint.as_ref().map(|h| h.leader_id.to_string()),
            leader_address: r.leader_hint.map(|h| h.address),
            debug_message: None,
        })
    } else {
        None
    };
    let success_result = r.result.map(|payload| match payload {
        ClientResponsePayload::Write(WriteResult { succeeded }) => {
            SuccessResult::WriteResult(ProtoWriteResult { succeeded })
        }
        ClientResponsePayload::Read(rd) => SuccessResult::ReadData(ProtoReadResults {
            results: rd
                .entries
                .into_iter()
                .map(|e| ClientResult {
                    key: e.key,
                    value: e.value,
                })
                .collect(),
        }),
    });
    ClientResponse {
        error: error_code,
        metadata,
        success_result,
    }
}

#[tonic::async_trait]
impl ClusterManagementService for MockRpcService {
    async fn update_cluster_conf(
        &self,
        _request: tonic::Request<ClusterConfChangeRequest>,
    ) -> std::result::Result<tonic::Response<ClusterConfUpdateResponse>, tonic::Status> {
        match &self.expected_update_cluster_conf_response {
            Some(Ok(response)) => Ok(tonic::Response::new(*response)),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock update_cluster_conf response set",
            )),
        }
    }
    async fn get_cluster_metadata(
        &self,
        _request: tonic::Request<MetadataRequest>,
    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
        match (&self.expected_metadata_response, self.server_port) {
            (Some(f), Some(port)) => f(port).map(tonic::Response::new).map_err(|e| e.clone()),
            _ => Err(tonic::Status::unimplemented(
                "Metadata response not configured",
            )),
        }
    }

    async fn join_cluster(
        &self,
        _request: tonic::Request<JoinRequest>,
    ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
        Err(tonic::Status::unimplemented(
            "join_cluster removed from client API",
        ))
    }

    async fn discover_leader(
        &self,
        _request: tonic::Request<LeaderDiscoveryRequest>,
    ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
        match &self.expected_discover_leader_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock discover_leader response set",
            )),
        }
    }
}

#[tonic::async_trait]
impl RaftClientService for MockRpcService {
    type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send>>;

    type WatchMembershipStream =
        Pin<Box<dyn Stream<Item = Result<MembershipSnapshot, tonic::Status>> + Send>>;

    async fn handle_client_write(
        &self,
        _request: tonic::Request<ClientWriteRequest>,
    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
        match &self.expected_client_propose_response {
            Some(Ok(response)) => Ok(tonic::Response::new(core_to_proto_response(
                response.clone(),
            ))),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock handle_client_write response set",
            )),
        }
    }

    async fn handle_client_read(
        &self,
        _request: tonic::Request<ClientReadRequest>,
    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
        match &self.expected_client_read_response {
            Some(Ok(response)) => Ok(tonic::Response::new(core_to_proto_response(
                response.clone(),
            ))),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock handle_client_read response set",
            )),
        }
    }

    async fn watch(
        &self,
        _request: tonic::Request<WatchRequest>,
    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
        match &self.expected_watch_events {
            Some(Ok(events)) => {
                let items: Vec<Result<WatchResponse, tonic::Status>> =
                    events.iter().cloned().map(Ok).collect();
                let stream = futures::stream::iter(items);
                Ok(tonic::Response::new(Box::pin(stream)))
            }
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unimplemented("Watch not configured in mock")),
        }
    }

    async fn watch_membership(
        &self,
        _request: tonic::Request<WatchMembershipRequest>,
    ) -> std::result::Result<tonic::Response<Self::WatchMembershipStream>, tonic::Status> {
        match &self.expected_watch_membership_events {
            Some(Ok(snapshots)) => {
                let items: Vec<Result<MembershipSnapshot, tonic::Status>> =
                    snapshots.iter().cloned().map(Ok).collect();
                let stream = futures::stream::iter(items);
                Ok(tonic::Response::new(Box::pin(stream)))
            }
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unimplemented(
                "WatchMembership not configured in mock",
            )),
        }
    }

    async fn handle_client_scan(
        &self,
        _request: tonic::Request<ScanRequest>,
    ) -> std::result::Result<tonic::Response<ScanResponse>, tonic::Status> {
        match &self.expected_client_scan_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unimplemented("Scan not configured in mock")),
        }
    }
}