Skip to main content

rs_zero/rpc/
client.rs

1use tonic::transport::{Channel, Endpoint};
2
3use crate::rpc::RpcClientConfig;
4#[cfg(feature = "discovery")]
5use crate::rpc::{
6    balancer::{RpcEndpoint, RpcEndpointSelector},
7    discovery::resolve_service_endpoint,
8};
9
10/// Errors returned while resolving and connecting RPC clients.
11#[cfg(feature = "discovery")]
12#[derive(Debug, thiserror::Error)]
13pub enum RpcConnectError {
14    /// Discovery returned no usable endpoint or failed.
15    #[error(transparent)]
16    Discovery(#[from] crate::discovery::DiscoveryError),
17    /// Tonic transport failed to build or connect the endpoint.
18    #[error(transparent)]
19    Transport(#[from] tonic::transport::Error),
20}
21
22/// Builds a tonic endpoint from rs-zero client config.
23pub fn endpoint_from_config(config: &RpcClientConfig) -> Result<Endpoint, tonic::transport::Error> {
24    Endpoint::from_shared(config.endpoint.clone()).map(|endpoint| {
25        endpoint
26            .connect_timeout(config.connect_timeout)
27            .timeout(config.request_timeout)
28    })
29}
30
31/// Connects a tonic channel from rs-zero client config.
32pub async fn connect_channel(config: &RpcClientConfig) -> Result<Channel, tonic::transport::Error> {
33    endpoint_from_config(config)?.connect().await
34}
35
36/// Builds a tonic endpoint from a discovered RPC endpoint.
37#[cfg(feature = "discovery")]
38pub fn endpoint_from_rpc_endpoint(
39    endpoint: &RpcEndpoint,
40    config: &RpcClientConfig,
41) -> Result<Endpoint, tonic::transport::Error> {
42    Endpoint::from_shared(endpoint.uri.clone()).map(|endpoint| {
43        endpoint
44            .connect_timeout(config.connect_timeout)
45            .timeout(config.request_timeout)
46    })
47}
48
49/// Resolves one endpoint through discovery and connects a tonic channel.
50#[cfg(feature = "discovery")]
51pub async fn connect_discovered_channel<D, S>(
52    discovery: &D,
53    selector: &S,
54    service: &str,
55    config: &RpcClientConfig,
56) -> Result<Channel, RpcConnectError>
57where
58    D: crate::discovery::Discovery + ?Sized,
59    S: RpcEndpointSelector + ?Sized,
60{
61    let endpoint = resolve_service_endpoint(discovery, selector, service).await?;
62    Ok(endpoint_from_rpc_endpoint(&endpoint, config)?
63        .connect()
64        .await?)
65}
66
67#[cfg(test)]
68mod tests {
69    use super::endpoint_from_config;
70    use crate::rpc::RpcClientConfig;
71
72    #[test]
73    fn endpoint_accepts_http_uri() {
74        let config = RpcClientConfig::new("http://127.0.0.1:50051");
75
76        assert!(endpoint_from_config(&config).is_ok());
77    }
78}