1use tonic::transport::{Channel, Endpoint};
2
3use crate::rpc::RpcClientConfig;
4#[cfg(feature = "discovery")]
5use crate::rpc::{
6 balancer::{RpcEndpoint, RpcEndpointSelector},
7 discovery::resolve_service_endpoint,
8};
9
10#[cfg(feature = "discovery")]
12#[derive(Debug, thiserror::Error)]
13pub enum RpcConnectError {
14 #[error(transparent)]
16 Discovery(#[from] crate::discovery::DiscoveryError),
17 #[error(transparent)]
19 Transport(#[from] tonic::transport::Error),
20}
21
22pub fn endpoint_from_config(config: &RpcClientConfig) -> Result<Endpoint, tonic::transport::Error> {
24 Endpoint::from_shared(config.endpoint.clone()).map(|endpoint| {
25 endpoint
26 .connect_timeout(config.connect_timeout)
27 .timeout(config.request_timeout)
28 })
29}
30
31pub async fn connect_channel(config: &RpcClientConfig) -> Result<Channel, tonic::transport::Error> {
33 endpoint_from_config(config)?.connect().await
34}
35
36#[cfg(feature = "discovery")]
38pub fn endpoint_from_rpc_endpoint(
39 endpoint: &RpcEndpoint,
40 config: &RpcClientConfig,
41) -> Result<Endpoint, tonic::transport::Error> {
42 Endpoint::from_shared(endpoint.uri.clone()).map(|endpoint| {
43 endpoint
44 .connect_timeout(config.connect_timeout)
45 .timeout(config.request_timeout)
46 })
47}
48
49#[cfg(feature = "discovery")]
51pub async fn connect_discovered_channel<D, S>(
52 discovery: &D,
53 selector: &S,
54 service: &str,
55 config: &RpcClientConfig,
56) -> Result<Channel, RpcConnectError>
57where
58 D: crate::discovery::Discovery + ?Sized,
59 S: RpcEndpointSelector + ?Sized,
60{
61 let endpoint = resolve_service_endpoint(discovery, selector, service).await?;
62 Ok(endpoint_from_rpc_endpoint(&endpoint, config)?
63 .connect()
64 .await?)
65}
66
67#[cfg(test)]
68mod tests {
69 use super::endpoint_from_config;
70 use crate::rpc::RpcClientConfig;
71
72 #[test]
73 fn endpoint_accepts_http_uri() {
74 let config = RpcClientConfig::new("http://127.0.0.1:50051");
75
76 assert!(endpoint_from_config(&config).is_ok());
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct RpcClientBuilder {
83 config: RpcClientConfig,
84}
85
86impl RpcClientBuilder {
87 pub fn new(config: RpcClientConfig) -> Self {
89 Self { config }
90 }
91
92 pub fn config(&self) -> &RpcClientConfig {
94 &self.config
95 }
96
97 pub async fn connect(&self) -> Result<Channel, tonic::transport::Error> {
99 connect_channel(&self.config).await
100 }
101
102 pub fn inject_request_context<T>(
104 &self,
105 context: &crate::layer::context::RequestContext,
106 request: &mut tonic::Request<T>,
107 ) -> Result<(), tonic::metadata::errors::InvalidMetadataValue> {
108 let _ = &self.config;
109 context.inject_tonic_metadata(request.metadata_mut())?;
110 context.insert_tonic_extensions(request);
111 Ok(())
112 }
113
114 pub async fn scope_request_id<T>(
116 &self,
117 request_id: impl Into<String>,
118 future: impl std::future::Future<Output = T>,
119 ) -> T {
120 let _ = &self.config;
121 crate::layer::context::scope_request_id(request_id, future).await
122 }
123}