pub(crate) mod http_request_to_grpc_frames_typed;
pub(crate) mod http_response_to_grpc_frames;
pub(crate) mod http_response_to_grpc_frames_typed;
pub(crate) mod req_sink;
pub(crate) mod types;
use bytes::Bytes;
use httpbis::ClientTlsOption;
use httpbis::Header;
use httpbis::Headers;
use httpbis::HttpScheme;
use crate::method::MethodDescriptor;
use crate::result;
use crate::client::http_request_to_grpc_frames_typed::http_req_to_grpc_frames_typed;
use crate::client::http_response_to_grpc_frames_typed::http_response_to_grpc_frames_typed;
use crate::client::req_sink::ClientRequestSink;
use crate::error;
use futures::future;
use futures::future::TryFutureExt;
use crate::or_static::arc::ArcOrStatic;
use crate::proto::grpc_frame::write_grpc_frame_cb;
use crate::req::*;
use crate::resp::*;
use std::future::Future;
use std::pin::Pin;
use tokio::runtime::Handle;
#[derive(Default, Debug, Clone)]
pub struct ClientConf {
pub http: httpbis::ClientConf,
}
impl ClientConf {
pub fn new() -> ClientConf {
Default::default()
}
}
enum ClientBuilderType<'a> {
Tcp { port: u16, host: &'a str },
Unix { socket: &'a str },
}
enum Tls<T: tls_api::TlsConnector> {
Explict(ClientTlsOption<T>),
Implicit,
None,
}
pub struct ClientBuilder<'a, T: tls_api::TlsConnector> {
client_type: ClientBuilderType<'a>,
http_scheme: HttpScheme,
event_loop: Option<Handle>,
pub conf: ClientConf,
tls: Tls<T>,
}
impl<'a, T: tls_api::TlsConnector> ClientBuilder<'a, T> {
pub fn with_event_loop(mut self, event_loop: Handle) -> Self {
self.event_loop = Some(event_loop);
self
}
pub fn conf(mut self, config: ClientConf) -> Self {
self.conf = config;
self
}
pub fn build(self) -> result::Result<Client> {
let mut builder = httpbis::ClientBuilder::<T>::new();
let mut conf = self.conf;
conf.http.thread_name = Some(
conf.http
.thread_name
.unwrap_or_else(|| "grpc-client-loop".to_owned()),
);
let (host, port) = match self.client_type {
ClientBuilderType::Tcp { host, port } => {
if self.http_scheme == HttpScheme::Https {
builder.set_tls(host)?;
}
builder.set_addr((host, port))?;
(host, Some(port))
}
ClientBuilderType::Unix { socket } => {
builder.set_unix_addr(socket)?;
(socket, None)
}
};
builder.event_loop = self.event_loop;
builder.conf = conf.http;
match self.tls {
Tls::Explict(tls) => {
builder.tls = tls;
}
Tls::Implicit | Tls::None => {}
}
Ok(Client {
client: ::std::sync::Arc::new(builder.build()?),
host: host.to_owned(),
http_scheme: self.http_scheme,
port,
})
}
}
impl<'a> ClientBuilder<'a, tls_api_stub::TlsConnector> {
pub fn new(host: &'a str, port: u16) -> Self {
ClientBuilder {
client_type: ClientBuilderType::Tcp { port, host },
http_scheme: HttpScheme::Http,
event_loop: None,
conf: Default::default(),
tls: Tls::None,
}
}
pub fn new_unix(addr: &'a str) -> Self {
ClientBuilder {
client_type: ClientBuilderType::Unix { socket: addr },
http_scheme: HttpScheme::Http,
event_loop: None,
conf: Default::default(),
tls: Tls::None,
}
}
pub fn tls<TLS: tls_api::TlsConnector>(self) -> ClientBuilder<'a, TLS> {
ClientBuilder {
client_type: self.client_type,
http_scheme: HttpScheme::Https,
event_loop: self.event_loop,
conf: self.conf,
tls: Tls::Implicit,
}
}
pub fn explicit_tls<TLS: tls_api::TlsConnector>(
self,
tls: ClientTlsOption<TLS>,
) -> ClientBuilder<'a, TLS> {
ClientBuilder {
client_type: self.client_type,
http_scheme: tls.http_scheme(),
event_loop: self.event_loop,
conf: self.conf,
tls: Tls::Explict(tls),
}
}
}
#[derive(Debug)]
pub struct Client {
client: ::std::sync::Arc<httpbis::Client>,
host: String,
http_scheme: HttpScheme,
port: Option<u16>,
}
impl Client {
fn call_impl<Req, Resp>(
&self,
options: RequestOptions,
req: Option<Req>,
method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
) -> Pin<
Box<
dyn Future<Output = crate::Result<(ClientRequestSink<Req>, StreamingResponse<Resp>)>>
+ Send,
>,
>
where
Req: Send + 'static,
Resp: Send + 'static,
{
let mut authority = self.host.clone();
if let Some(port_value) = self.port {
authority = format!("{}:{}", authority, port_value);
}
debug!("start call {}/{}", authority, method.name);
if options.cachable {
}
let mut headers = Headers::from_vec(vec![
Header::new(Bytes::from_static(b":method"), Bytes::from_static(b"POST")),
Header::new(Bytes::from_static(b":path"), method.name.to_string()),
Header::new(Bytes::from_static(b":authority"), authority),
Header::new(
Bytes::from_static(b":scheme"),
Bytes::from_static(self.http_scheme.as_bytes()),
),
Header::new(
Bytes::from_static(b"content-type"),
Bytes::from_static(b"application/grpc"),
),
Header::new(Bytes::from_static(b"te"), Bytes::from_static(b"trailers")),
]);
headers.extend(options.metadata.into_headers());
let req_bytes = match req {
Some(req) => {
let mut frame = Vec::new();
let size_estimate = match method.req_marshaller.write_size_estimate(&req) {
Ok(len) => len,
Err(e) => return Box::pin(future::err(e)),
};
match write_grpc_frame_cb(&mut frame, size_estimate, |frame| {
method.req_marshaller.write(&req, size_estimate, frame)
}) {
Ok(()) => {}
Err(e) => return Box::pin(future::err(e)),
}
Some(Bytes::from(frame))
}
None => None,
};
let end_stream = req_bytes.is_some();
let http_future = self
.client
.start_request(headers, req_bytes, None, end_stream);
let http_future = http_future.map_err(error::Error::from);
let req_marshaller = method.req_marshaller.clone();
let resp_marshaller = method.resp_marshaller.clone();
Box::pin(http_future.map_ok(move |(req, resp)| {
let grpc_req = http_req_to_grpc_frames_typed(req, req_marshaller);
let grpc_resp = http_response_to_grpc_frames_typed(resp, resp_marshaller);
(grpc_req, grpc_resp)
}))
}
pub fn call_unary<Req, Resp>(
&self,
o: RequestOptions,
req: Req,
method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
) -> SingleResponse<Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
SingleResponse::new(
self.call_impl(o, Some(req), method)
.and_then(|(_req, resp)| resp.single()),
)
}
pub fn call_server_streaming<Req, Resp>(
&self,
o: RequestOptions,
req: Req,
method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
) -> StreamingResponse<Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
let f = self.call_impl(o, Some(req), method);
StreamingResponse::new(async move {
let (_req, resp) = f.await?;
resp.0.await
})
}
pub fn call_client_streaming<Req, Resp>(
&self,
o: RequestOptions,
method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
) -> impl Future<Output = crate::Result<(ClientRequestSink<Req>, SingleResponse<Resp>)>>
where
Req: Send + 'static,
Resp: Send + 'static,
{
TryFutureExt::map_ok(self.call_impl(o, None, method), |(req, resp)| {
(req, resp.single())
})
}
pub fn call_bidi<Req, Resp>(
&self,
o: RequestOptions,
method: ArcOrStatic<MethodDescriptor<Req, Resp>>,
) -> impl Future<Output = crate::Result<(ClientRequestSink<Req>, StreamingResponse<Resp>)>>
where
Req: Send + 'static,
Resp: Send + 'static,
{
self.call_impl(o, None, method)
}
}
fn _assert_types() {
crate::assert_types::assert_send::<Client>();
crate::assert_types::assert_sync::<Client>();
}