use tonic::transport::{Channel, Endpoint};
use crate::rpc::RpcClientConfig;
#[cfg(feature = "discovery")]
use crate::rpc::{
balancer::{RpcEndpoint, RpcEndpointSelector},
discovery::resolve_service_endpoint,
};
#[cfg(feature = "discovery")]
#[derive(Debug, thiserror::Error)]
pub enum RpcConnectError {
#[error(transparent)]
Discovery(#[from] crate::discovery::DiscoveryError),
#[error(transparent)]
Transport(#[from] tonic::transport::Error),
}
pub fn endpoint_from_config(config: &RpcClientConfig) -> Result<Endpoint, tonic::transport::Error> {
Endpoint::from_shared(config.endpoint.clone()).map(|endpoint| {
endpoint
.connect_timeout(config.connect_timeout)
.timeout(config.request_timeout)
})
}
pub async fn connect_channel(config: &RpcClientConfig) -> Result<Channel, tonic::transport::Error> {
endpoint_from_config(config)?.connect().await
}
#[cfg(feature = "discovery")]
pub fn endpoint_from_rpc_endpoint(
endpoint: &RpcEndpoint,
config: &RpcClientConfig,
) -> Result<Endpoint, tonic::transport::Error> {
Endpoint::from_shared(endpoint.uri.clone()).map(|endpoint| {
endpoint
.connect_timeout(config.connect_timeout)
.timeout(config.request_timeout)
})
}
#[cfg(feature = "discovery")]
pub async fn connect_discovered_channel<D, S>(
discovery: &D,
selector: &S,
service: &str,
config: &RpcClientConfig,
) -> Result<Channel, RpcConnectError>
where
D: crate::discovery::Discovery + ?Sized,
S: RpcEndpointSelector + ?Sized,
{
let endpoint = resolve_service_endpoint(discovery, selector, service).await?;
Ok(endpoint_from_rpc_endpoint(&endpoint, config)?
.connect()
.await?)
}
#[cfg(test)]
mod tests {
use super::endpoint_from_config;
use crate::rpc::RpcClientConfig;
#[test]
fn endpoint_accepts_http_uri() {
let config = RpcClientConfig::new("http://127.0.0.1:50051");
assert!(endpoint_from_config(&config).is_ok());
}
}
#[derive(Debug, Clone)]
pub struct RpcClientBuilder {
config: RpcClientConfig,
}
impl RpcClientBuilder {
pub fn new(config: RpcClientConfig) -> Self {
Self { config }
}
pub fn config(&self) -> &RpcClientConfig {
&self.config
}
pub async fn connect(&self) -> Result<Channel, tonic::transport::Error> {
connect_channel(&self.config).await
}
pub fn inject_request_context<T>(
&self,
context: &crate::layer::context::RequestContext,
request: &mut tonic::Request<T>,
) -> Result<(), tonic::metadata::errors::InvalidMetadataValue> {
let _ = &self.config;
context.inject_tonic_metadata(request.metadata_mut())?;
context.insert_tonic_extensions(request);
Ok(())
}
pub async fn scope_request_id<T>(
&self,
request_id: impl Into<String>,
future: impl std::future::Future<Output = T>,
) -> T {
let _ = &self.config;
crate::layer::context::scope_request_id(request_id, future).await
}
}