rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use tonic::transport::{Channel, Endpoint};

use crate::rpc::RpcClientConfig;
#[cfg(feature = "discovery")]
use crate::rpc::{
    balancer::{RpcEndpoint, RpcEndpointSelector},
    discovery::resolve_service_endpoint,
};

/// Errors returned while resolving and connecting RPC clients.
#[cfg(feature = "discovery")]
#[derive(Debug, thiserror::Error)]
pub enum RpcConnectError {
    /// Discovery returned no usable endpoint or failed.
    #[error(transparent)]
    Discovery(#[from] crate::discovery::DiscoveryError),
    /// Tonic transport failed to build or connect the endpoint.
    #[error(transparent)]
    Transport(#[from] tonic::transport::Error),
}

/// Builds a tonic endpoint from rs-zero client config.
pub fn endpoint_from_config(config: &RpcClientConfig) -> Result<Endpoint, tonic::transport::Error> {
    Endpoint::from_shared(config.endpoint.clone()).map(|endpoint| {
        endpoint
            .connect_timeout(config.connect_timeout)
            .timeout(config.request_timeout)
    })
}

/// Connects a tonic channel from rs-zero client config.
pub async fn connect_channel(config: &RpcClientConfig) -> Result<Channel, tonic::transport::Error> {
    endpoint_from_config(config)?.connect().await
}

/// Builds a tonic endpoint from a discovered RPC endpoint.
#[cfg(feature = "discovery")]
pub fn endpoint_from_rpc_endpoint(
    endpoint: &RpcEndpoint,
    config: &RpcClientConfig,
) -> Result<Endpoint, tonic::transport::Error> {
    Endpoint::from_shared(endpoint.uri.clone()).map(|endpoint| {
        endpoint
            .connect_timeout(config.connect_timeout)
            .timeout(config.request_timeout)
    })
}

/// Resolves one endpoint through discovery and connects a tonic channel.
#[cfg(feature = "discovery")]
pub async fn connect_discovered_channel<D, S>(
    discovery: &D,
    selector: &S,
    service: &str,
    config: &RpcClientConfig,
) -> Result<Channel, RpcConnectError>
where
    D: crate::discovery::Discovery + ?Sized,
    S: RpcEndpointSelector + ?Sized,
{
    let endpoint = resolve_service_endpoint(discovery, selector, service).await?;
    Ok(endpoint_from_rpc_endpoint(&endpoint, config)?
        .connect()
        .await?)
}

#[cfg(test)]
mod tests {
    use super::endpoint_from_config;
    use crate::rpc::RpcClientConfig;

    #[test]
    fn endpoint_accepts_http_uri() {
        let config = RpcClientConfig::new("http://127.0.0.1:50051");

        assert!(endpoint_from_config(&config).is_ok());
    }
}

/// Tower-first RPC client builder.
#[derive(Debug, Clone)]
pub struct RpcClientBuilder {
    config: RpcClientConfig,
}

impl RpcClientBuilder {
    /// Creates a builder from client configuration.
    pub fn new(config: RpcClientConfig) -> Self {
        Self { config }
    }

    /// Returns the builder configuration.
    pub fn config(&self) -> &RpcClientConfig {
        &self.config
    }

    /// Connects a tonic channel using the configured endpoint and timeouts.
    pub async fn connect(&self) -> Result<Channel, tonic::transport::Error> {
        connect_channel(&self.config).await
    }

    /// Injects request context into an outgoing tonic request.
    pub fn inject_request_context<T>(
        &self,
        context: &crate::layer::context::RequestContext,
        request: &mut tonic::Request<T>,
    ) -> Result<(), tonic::metadata::errors::InvalidMetadataValue> {
        let _ = &self.config;
        context.inject_tonic_metadata(request.metadata_mut())?;
        context.insert_tonic_extensions(request);
        Ok(())
    }

    /// Runs a future with the provided request id available to outgoing RPC calls.
    pub async fn scope_request_id<T>(
        &self,
        request_id: impl Into<String>,
        future: impl std::future::Future<Output = T>,
    ) -> T {
        let _ = &self.config;
        crate::layer::context::scope_request_id(request_id, future).await
    }
}