pub mod client_streaming;
pub mod server_streaming;
pub mod streaming;
pub mod unary;
use crate::body::BoxBody;
use crate::generic::client::{GrpcService, IntoService};
use futures::{stream, Future, Poll, Stream};
use http::{uri, Uri};
use prost::Message;
#[derive(Debug, Clone)]
pub struct Grpc<T> {
inner: T,
}
pub trait Encodable<T> {
fn into_encode(self) -> T;
}
impl<T> Grpc<T> {
pub fn new(inner: T) -> Self {
Grpc { inner }
}
pub fn poll_ready<R>(&mut self) -> Poll<(), crate::Status>
where
T: GrpcService<R>,
{
self.inner
.poll_ready()
.map_err(|err| crate::Status::from_error(&*(err.into())))
}
pub fn ready<R>(self) -> impl Future<Item = Self, Error = crate::Status>
where
T: GrpcService<R>,
{
use tower_util::Ready;
Ready::new(self.inner.into_service())
.map(|IntoService(inner)| Grpc { inner })
.map_err(|err| crate::Status::from_error(&*(err.into())))
}
pub fn unary<M1, M2, R>(
&mut self,
request: crate::Request<M1>,
path: uri::PathAndQuery,
) -> unary::ResponseFuture<M2, T::Future, T::ResponseBody>
where
T: GrpcService<R>,
unary::Once<M1>: Encodable<R>,
{
let request = request.map(|v| stream::once(Ok(v)));
let response = self.client_streaming(request, path);
unary::ResponseFuture::new(response)
}
pub fn client_streaming<B, M, R>(
&mut self,
request: crate::Request<B>,
path: uri::PathAndQuery,
) -> client_streaming::ResponseFuture<M, T::Future, T::ResponseBody>
where
T: GrpcService<R>,
B: Encodable<R>,
{
let response = self.streaming(request, path);
client_streaming::ResponseFuture::new(response)
}
pub fn server_streaming<M1, M2, R>(
&mut self,
request: crate::Request<M1>,
path: uri::PathAndQuery,
) -> server_streaming::ResponseFuture<M2, T::Future>
where
T: GrpcService<R>,
unary::Once<M1>: Encodable<R>,
{
let request = request.map(|v| stream::once(Ok(v)));
let response = self.streaming(request, path);
server_streaming::ResponseFuture::new(response)
}
pub fn streaming<B, M, R>(
&mut self,
request: crate::Request<B>,
path: uri::PathAndQuery,
) -> streaming::ResponseFuture<M, T::Future>
where
T: GrpcService<R>,
B: Encodable<R>,
{
use http::header::{self, HeaderValue};
let mut parts = uri::Parts::default();
parts.path_and_query = Some(path);
let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");
let request = request.map(Encodable::into_encode);
let mut request = request.into_http(uri);
request
.headers_mut()
.insert(header::TE, HeaderValue::from_static("trailers"));
let content_type = "application/grpc+proto";
request
.headers_mut()
.insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
let response = self.inner.call(request);
streaming::ResponseFuture::new(response)
}
}
impl<T, U> Encodable<BoxBody> for T
where
T: Stream<Item = U, Error = crate::Status> + Send + 'static,
U: Message + 'static,
{
fn into_encode(self) -> BoxBody {
use crate::codec::Encoder;
use crate::generic::Encode;
let encode = Encode::request(Encoder::new(), self);
BoxBody::new(Box::new(encode))
}
}