use super::codec::JsonCodec;
use crate::BoxError;
use futures_util::Stream;
use http_body::Body as HttpBody;
use prost_reflect::MethodDescriptor;
use std::str::FromStr;
use tonic::{
client::GrpcService,
metadata::{
MetadataKey, MetadataValue,
errors::{InvalidMetadataKey, InvalidMetadataValue},
},
transport::Channel,
};
#[derive(thiserror::Error, Debug)]
pub enum GrpcRequestError {
#[error("Internal error, the client was not ready: '{0}'")]
ClientNotReady(#[source] BoxError),
#[error("Invalid metadata (header) key '{key}': '{source}'")]
InvalidMetadataKey {
key: String,
source: InvalidMetadataKey,
},
#[error("Invalid metadata (header) value for key '{key}': '{source}'")]
InvalidMetadataValue {
key: String,
source: InvalidMetadataValue,
},
}
#[derive(Debug, Clone)]
pub struct GrpcClient<S = Channel> {
client: tonic::client::Grpc<S>,
}
impl<S> GrpcClient<S>
where
S: GrpcService<tonic::body::Body>,
S::Error: Into<BoxError>,
S::ResponseBody: HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<S::ResponseBody as HttpBody>::Error: Into<BoxError> + Send,
{
pub fn new(service: S) -> Self {
let client = tonic::client::Grpc::new(service);
Self { client }
}
pub async fn unary(
&mut self,
method: MethodDescriptor,
payload: serde_json::Value,
headers: Vec<(String, String)>,
) -> Result<Result<serde_json::Value, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
.map_err(|e| GrpcRequestError::ClientNotReady(e.into()))?;
let codec = JsonCodec::new(method.input(), method.output());
let path = http_path(&method);
let request = build_request(payload, headers)?;
match self.client.unary(request, path, codec).await {
Ok(response) => Ok(Ok(response.into_inner())),
Err(status) => Ok(Err(status)),
}
}
pub async fn server_streaming(
&mut self,
method: MethodDescriptor,
payload: serde_json::Value,
headers: Vec<(String, String)>,
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
.map_err(|e| GrpcRequestError::ClientNotReady(e.into()))?;
let codec = JsonCodec::new(method.input(), method.output());
let path = http_path(&method);
let request = build_request(payload, headers)?;
match self.client.server_streaming(request, path, codec).await {
Ok(response) => Ok(Ok(response.into_inner())),
Err(status) => Ok(Err(status)),
}
}
pub async fn client_streaming(
&mut self,
method: MethodDescriptor,
payload_stream: impl Stream<Item = serde_json::Value> + Send + 'static,
headers: Vec<(String, String)>,
) -> Result<Result<serde_json::Value, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
.map_err(|e| GrpcRequestError::ClientNotReady(e.into()))?;
let codec = JsonCodec::new(method.input(), method.output());
let path = http_path(&method);
let request = build_request(payload_stream, headers)?;
match self.client.client_streaming(request, path, codec).await {
Ok(response) => Ok(Ok(response.into_inner())),
Err(status) => Ok(Err(status)),
}
}
pub async fn bidirectional_streaming(
&mut self,
method: MethodDescriptor,
payload_stream: impl Stream<Item = serde_json::Value> + Send + 'static,
headers: Vec<(String, String)>,
) -> Result<Result<tonic::Streaming<serde_json::Value>, tonic::Status>, GrpcRequestError> {
self.client
.ready()
.await
.map_err(|e| GrpcRequestError::ClientNotReady(e.into()))?;
let codec = JsonCodec::new(method.input(), method.output());
let path = http_path(&method);
let request = build_request(payload_stream, headers)?;
match self.client.streaming(request, path, codec).await {
Ok(response) => Ok(Ok(response.into_inner())),
Err(status) => Ok(Err(status)),
}
}
}
fn http_path(method: &MethodDescriptor) -> http::uri::PathAndQuery {
let path = format!("/{}/{}", method.parent_service().full_name(), method.name());
http::uri::PathAndQuery::from_str(&path).expect("valid gRPC path")
}
fn build_request<T>(
payload: T,
headers: Vec<(String, String)>,
) -> Result<tonic::Request<T>, GrpcRequestError> {
let mut request = tonic::Request::new(payload);
for (k, v) in headers {
let key =
MetadataKey::from_str(&k).map_err(|source| GrpcRequestError::InvalidMetadataKey {
key: k.clone(),
source,
})?;
let val = MetadataValue::from_str(&v)
.map_err(|source| GrpcRequestError::InvalidMetadataValue { key: k, source })?;
request.metadata_mut().insert(key, val);
}
Ok(request)
}