d-engine-server 0.2.3

Production-ready Raft consensus engine server and runtime
Documentation
use std::sync::Arc;

use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientResponse;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::WatchRequest;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::client::raft_client_service_server::RaftClientService;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::JoinRequest;
use d_engine_proto::server::cluster::JoinResponse;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VoteResponse;
use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
use d_engine_proto::server::storage::SnapshotAck;
use d_engine_proto::server::storage::SnapshotChunk;
use d_engine_proto::server::storage::SnapshotResponse;
use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
use tokio_stream::StreamExt;
use tonic::Streaming;
use tracing::trace;

use super::snapshot::create_test_snapshot_stream;

#[derive(Clone, Default)]
pub struct MockRpcService {
    pub server_port: Option<u16>,
    // Expected responses for each method
    pub expected_vote_response: Option<Result<VoteResponse, tonic::Status>>,
    pub expected_append_entries_response: Option<Result<AppendEntriesResponse, tonic::Status>>,
    pub expected_update_cluster_conf_response:
        Option<Result<ClusterConfUpdateResponse, tonic::Status>>,
    pub expected_client_propose_response: Option<Result<ClientResponse, tonic::Status>>,
    pub expected_client_read_response: Option<Result<ClientResponse, tonic::Status>>,

    #[allow(clippy::type_complexity)]
    pub expected_metadata_response:
        Option<Arc<dyn Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync>>,

    pub expected_snapshot_response: Option<Result<SnapshotResponse, tonic::Status>>,
    pub expected_stream_snapshot_response: Option<Result<SnapshotChunk, tonic::Status>>,

    pub expected_join_cluster_response: Option<Result<JoinResponse, tonic::Status>>,
    pub expected_discover_leader_response: Option<Result<LeaderDiscoveryResponse, tonic::Status>>,
}
impl MockRpcService {
    pub fn with_metadata_response(
        mut self,
        f: impl Fn(u16) -> Result<ClusterMembership, tonic::Status> + Send + Sync + 'static,
    ) -> Self {
        self.expected_metadata_response = Some(Arc::new(f));
        self
    }

    pub fn set_port(
        &mut self,
        port: u16,
    ) {
        self.server_port = Some(port);
    }
}

#[tonic::async_trait]
impl RaftElectionService for MockRpcService {
    async fn request_vote(
        &self,
        _request: tonic::Request<VoteRequest>,
    ) -> std::result::Result<tonic::Response<VoteResponse>, tonic::Status> {
        match &self.expected_vote_response {
            Some(Ok(response)) => Ok(tonic::Response::new(*response)),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown("No mock vote response set")),
        }
    }
}

#[tonic::async_trait]
impl RaftReplicationService for MockRpcService {
    async fn append_entries(
        &self,
        _request: tonic::Request<AppendEntriesRequest>,
    ) -> std::result::Result<tonic::Response<AppendEntriesResponse>, tonic::Status> {
        match &self.expected_append_entries_response {
            Some(Ok(response)) => Ok(tonic::Response::new(*response)),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock append entries response set",
            )),
        }
    }
}

#[tonic::async_trait]
impl ClusterManagementService for MockRpcService {
    async fn update_cluster_conf(
        &self,
        _request: tonic::Request<ClusterConfChangeRequest>,
    ) -> std::result::Result<tonic::Response<ClusterConfUpdateResponse>, tonic::Status> {
        match &self.expected_update_cluster_conf_response {
            Some(Ok(response)) => Ok(tonic::Response::new(*response)),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock update_cluster_conf response set",
            )),
        }
    }
    async fn get_cluster_metadata(
        &self,
        _request: tonic::Request<MetadataRequest>,
    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
        match (&self.expected_metadata_response, self.server_port) {
            (Some(f), Some(port)) => f(port).map(tonic::Response::new).map_err(|e| e.clone()),
            _ => Err(tonic::Status::unimplemented(
                "Metadata response not configured",
            )),
        }
    }

    async fn join_cluster(
        &self,
        _request: tonic::Request<JoinRequest>,
    ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
        match &self.expected_join_cluster_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown("No mock join_cluster response set")),
        }
    }

    async fn discover_leader(
        &self,
        _request: tonic::Request<LeaderDiscoveryRequest>,
    ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
        match &self.expected_discover_leader_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock get_cluster_metadata response set",
            )),
        }
    }
}

#[tonic::async_trait]
impl RaftClientService for MockRpcService {
    type WatchStream = tokio_stream::wrappers::ReceiverStream<Result<WatchResponse, tonic::Status>>;

    async fn handle_client_write(
        &self,
        _request: tonic::Request<ClientWriteRequest>,
    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
        match &self.expected_client_propose_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock handle_client_write response set",
            )),
        }
    }

    async fn handle_client_read(
        &self,
        _request: tonic::Request<ClientReadRequest>,
    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
        match &self.expected_client_read_response {
            Some(Ok(response)) => Ok(tonic::Response::new(response.clone())),
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock handle_client_read response set",
            )),
        }
    }

    async fn watch(
        &self,
        _request: tonic::Request<WatchRequest>,
    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
        // Mock implementation: return an empty stream
        let (_tx, rx) = tokio::sync::mpsc::channel(1);
        Ok(tonic::Response::new(
            tokio_stream::wrappers::ReceiverStream::new(rx),
        ))
    }
}

#[tonic::async_trait]
impl SnapshotService for MockRpcService {
    type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;

    async fn stream_snapshot(
        &self,
        _request: tonic::Request<tonic::Streaming<SnapshotAck>>,
    ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
        match &self.expected_stream_snapshot_response {
            Some(Ok(response)) => {
                let streaming: Self::StreamSnapshotStream =
                    create_test_snapshot_stream(vec![response.clone()]);
                Ok(tonic::Response::new(streaming))
            }
            Some(Err(status)) => Err(status.clone()),
            None => Err(tonic::Status::unknown(
                "No mock install_snapshot response set",
            )),
        }
    }
    async fn install_snapshot(
        &self,
        request: tonic::Request<Streaming<SnapshotChunk>>,
    ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
        let mut stream = request.into_inner();

        while let Some(_chunk) = stream.next().await {
            trace!("install_snapshot receive chunk - ");
        }
        trace!("install_snapshot no more to receive!");

        match &self.expected_snapshot_response {
            Some(Ok(response)) => return Ok(tonic::Response::new(*response)),
            Some(Err(status)) => return Err(status.clone()),
            None => {
                return Err(tonic::Status::unknown(
                    "No mock install_snapshot response set",
                ));
            }
        }
    }
}