Skip to main content

rs_zero/rpc/
client.rs

1use tonic::transport::{Channel, Endpoint};
2
3use crate::rpc::RpcClientConfig;
4#[cfg(feature = "discovery")]
5use crate::rpc::{
6    balancer::{RpcEndpoint, RpcEndpointSelector},
7    discovery::resolve_service_endpoint,
8};
9
10/// Errors returned while resolving and connecting RPC clients.
11#[cfg(feature = "discovery")]
12#[derive(Debug, thiserror::Error)]
13pub enum RpcConnectError {
14    /// Discovery returned no usable endpoint or failed.
15    #[error(transparent)]
16    Discovery(#[from] crate::discovery::DiscoveryError),
17    /// Tonic transport failed to build or connect the endpoint.
18    #[error(transparent)]
19    Transport(#[from] tonic::transport::Error),
20}
21
22/// Builds a tonic endpoint from rs-zero client config.
23pub fn endpoint_from_config(config: &RpcClientConfig) -> Result<Endpoint, tonic::transport::Error> {
24    Endpoint::from_shared(config.endpoint.clone()).map(|endpoint| {
25        endpoint
26            .connect_timeout(config.connect_timeout)
27            .timeout(config.request_timeout)
28    })
29}
30
31/// Connects a tonic channel from rs-zero client config.
32pub async fn connect_channel(config: &RpcClientConfig) -> Result<Channel, tonic::transport::Error> {
33    endpoint_from_config(config)?.connect().await
34}
35
36/// Builds a tonic endpoint from a discovered RPC endpoint.
37#[cfg(feature = "discovery")]
38pub fn endpoint_from_rpc_endpoint(
39    endpoint: &RpcEndpoint,
40    config: &RpcClientConfig,
41) -> Result<Endpoint, tonic::transport::Error> {
42    Endpoint::from_shared(endpoint.uri.clone()).map(|endpoint| {
43        endpoint
44            .connect_timeout(config.connect_timeout)
45            .timeout(config.request_timeout)
46    })
47}
48
49/// Resolves one endpoint through discovery and connects a tonic channel.
50#[cfg(feature = "discovery")]
51pub async fn connect_discovered_channel<D, S>(
52    discovery: &D,
53    selector: &S,
54    service: &str,
55    config: &RpcClientConfig,
56) -> Result<Channel, RpcConnectError>
57where
58    D: crate::discovery::Discovery + ?Sized,
59    S: RpcEndpointSelector + ?Sized,
60{
61    let endpoint = resolve_service_endpoint(discovery, selector, service).await?;
62    Ok(endpoint_from_rpc_endpoint(&endpoint, config)?
63        .connect()
64        .await?)
65}
66
67#[cfg(test)]
68mod tests {
69    use super::endpoint_from_config;
70    use crate::rpc::RpcClientConfig;
71
72    #[test]
73    fn endpoint_accepts_http_uri() {
74        let config = RpcClientConfig::new("http://127.0.0.1:50051");
75
76        assert!(endpoint_from_config(&config).is_ok());
77    }
78}
79
80/// Tower-first RPC client builder.
81#[derive(Debug, Clone)]
82pub struct RpcClientBuilder {
83    config: RpcClientConfig,
84}
85
86impl RpcClientBuilder {
87    /// Creates a builder from client configuration.
88    pub fn new(config: RpcClientConfig) -> Self {
89        Self { config }
90    }
91
92    /// Returns the builder configuration.
93    pub fn config(&self) -> &RpcClientConfig {
94        &self.config
95    }
96
97    /// Connects a tonic channel using the configured endpoint and timeouts.
98    pub async fn connect(&self) -> Result<Channel, tonic::transport::Error> {
99        connect_channel(&self.config).await
100    }
101
102    /// Injects request context into an outgoing tonic request.
103    pub fn inject_request_context<T>(
104        &self,
105        context: &crate::layer::context::RequestContext,
106        request: &mut tonic::Request<T>,
107    ) -> Result<(), tonic::metadata::errors::InvalidMetadataValue> {
108        let _ = &self.config;
109        context.inject_tonic_metadata(request.metadata_mut())?;
110        context.insert_tonic_extensions(request);
111        Ok(())
112    }
113
114    /// Runs a future with the provided request id available to outgoing RPC calls.
115    pub async fn scope_request_id<T>(
116        &self,
117        request_id: impl Into<String>,
118        future: impl std::future::Future<Output = T>,
119    ) -> T {
120        let _ = &self.config;
121        crate::layer::context::scope_request_id(request_id, future).await
122    }
123}