use super::{DynamicRequest, DynamicResponse, GrancClient, OnlineWithoutReflection};
use crate::{BoxError, client::OfflineReflectionState, grpc::client::GrpcRequestError};
use futures_util::{Stream, StreamExt, stream};
use http_body::Body as HttpBody;
use std::fmt::Debug;
#[derive(Debug, thiserror::Error)]
pub enum DynamicCallError {
#[error("Invalid input: '{0}'")]
InvalidInput(String),
#[error("Service '{0}' not found")]
ServiceNotFound(String),
#[error("Method '{0}' not found")]
MethodNotFound(String),
#[error("gRPC client request error: '{0}'")]
GrpcRequestError(#[from] GrpcRequestError),
}
impl<S> GrancClient<OnlineWithoutReflection<S>>
where
S: tonic::client::GrpcService<tonic::body::Body> + Clone,
S::ResponseBody: HttpBody<Data = tonic::codegen::Bytes> + Send + 'static,
<S::ResponseBody as HttpBody>::Error: Into<BoxError> + Send,
{
pub async fn dynamic(
&mut self,
request: DynamicRequest,
) -> Result<DynamicResponse, DynamicCallError> {
let method = self
.state
.descriptor_pool()
.get_service_by_name(&request.service)
.ok_or_else(|| DynamicCallError::ServiceNotFound(request.service.clone()))?
.methods()
.find(|m| m.name() == request.method)
.ok_or_else(|| DynamicCallError::MethodNotFound(request.method.clone()))?;
match (method.is_client_streaming(), method.is_server_streaming()) {
(false, false) => {
let result = self
.state
.grpc_client
.unary(method, request.body, request.headers)
.await?;
Ok(DynamicResponse::Unary(result))
}
(false, true) => match self
.state
.grpc_client
.server_streaming(method, request.body, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
},
(true, false) => {
let input_stream =
json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?;
let result = self
.state
.grpc_client
.client_streaming(method, input_stream, request.headers)
.await?;
Ok(DynamicResponse::Unary(result))
}
(true, true) => {
let input_stream =
json_array_to_stream(request.body).map_err(DynamicCallError::InvalidInput)?;
match self
.state
.grpc_client
.bidirectional_streaming(method, input_stream, request.headers)
.await?
{
Ok(stream) => Ok(DynamicResponse::Streaming(stream.boxed())),
Err(status) => Ok(DynamicResponse::Streaming(
stream::once(async { Err(status) }).boxed(),
)),
}
}
}
}
}
fn json_array_to_stream(
json: serde_json::Value,
) -> Result<impl Stream<Item = serde_json::Value> + Send + 'static, String> {
match json {
serde_json::Value::Array(items) => Ok(tokio_stream::iter(items)),
_ => Err("Client streaming requires a JSON Array body".to_string()),
}
}