use std::convert::Infallible;
use std::rc::Rc;
use std::time::Duration;
use crate::proxy_wasm::types::Status;
use crate::client::Service;
use crate::extract::context::ConfigureContext;
use crate::extract::{extractability, FromContext};
use crate::host::grpc::GrpcHost;
use crate::http_constants::DEFAULT_TIMEOUT;
use crate::reactor::root::RootReactor;
use crate::grpc::call::GrpcCall;
use crate::grpc::error::GrpcCallError;
use crate::grpc::transport::GrpcCallId;
use super::codec::{BytesCodec, Codec, Encoder};
use super::GrpcProxyError;
pub struct GrpcClient {
reactor: Rc<RootReactor>,
grpc_host: Rc<dyn GrpcHost>,
}
impl FromContext<ConfigureContext, extractability::Transitive> for GrpcClient {
type Error = Infallible;
fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
Ok(Self {
reactor: Rc::clone(&context.root_reactor),
grpc_host: Rc::clone(&context.grpc_host),
})
}
}
impl GrpcClient {
pub fn request<'a>(&'a self, upstream: &'a Service) -> GrpcServiceBuilder<'a> {
GrpcServiceBuilder::new(self.reactor.clone(), self.grpc_host.clone(), upstream)
}
}
pub struct GrpcServiceBuilder<'a> {
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
upstream: &'a Service,
}
impl<'a> GrpcServiceBuilder<'a> {
pub(super) fn new(
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
upstream: &'a Service,
) -> Self {
Self {
reactor,
host,
upstream,
}
}
pub fn service(self, service: &'a str) -> GrpcMethodBuilder<'a> {
GrpcMethodBuilder {
reactor: self.reactor,
host: self.host,
upstream: self.upstream,
service,
}
}
}
pub struct GrpcMethodBuilder<'a> {
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
upstream: &'a Service,
service: &'a str,
}
impl<'a> GrpcMethodBuilder<'a> {
pub fn method(self, method: &'a str) -> GrpcCallBuilder<'a, BytesCodec> {
GrpcCallBuilder {
reactor: self.reactor,
host: self.host,
upstream: self.upstream,
service: self.service,
method,
initial_metadata: Vec::new(),
timeout: DEFAULT_TIMEOUT,
codec: BytesCodec,
}
}
}
pub struct GrpcCallBuilder<'a, C> {
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
upstream: &'a Service,
service: &'a str,
method: &'a str,
initial_metadata: Vec<(&'a str, &'a [u8])>,
timeout: Duration,
codec: C,
}
impl<'a, C> GrpcCallBuilder<'a, C>
where
C: Codec,
{
pub fn initial_metadata(self, initial_metadata: Vec<(&'a str, &'a [u8])>) -> Self {
Self {
initial_metadata,
..self
}
}
pub fn add_initial_metadata(mut self, name: &'a str, values: &'a [u8]) -> Self {
self.initial_metadata.push((name, values));
self
}
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}
pub fn send(self, message: &<C::Encoder as Encoder>::Input) -> GrpcCall<C::Decoder> {
let (mut encoder, decoder) = self.codec.into_components();
let upstream = self.upstream.cluster_name();
let result = match encoder.encode(message) {
Ok(bytes) => self
.host
.dispatch_grpc_call(
upstream,
self.service,
self.method,
self.initial_metadata,
Some(bytes.as_ref()),
self.timeout,
)
.map(GrpcCallId::new)
.map_err(|s| {
if matches!(s, Status::ParseFailure) {
GrpcCallError::InvalidUpstream(upstream.to_string())
} else {
GrpcCallError::Proxy(GrpcProxyError::new(s))
}
}),
Err(e) => Err(GrpcCallError::Encode(e.into())),
};
GrpcCall::new(self.reactor, self.host, decoder, result)
}
pub(super) fn codec<T: Codec>(self, codec: T) -> GrpcCallBuilder<'a, T> {
GrpcCallBuilder {
reactor: self.reactor,
host: self.host,
upstream: self.upstream,
service: self.service,
method: self.method,
initial_metadata: self.initial_metadata,
timeout: self.timeout,
codec,
}
}
}