tower_grpc/client/
mod.rs

1//! gRPC client
2
3pub mod client_streaming;
4pub mod server_streaming;
5pub mod streaming;
6pub mod unary;
7
8use crate::body::BoxBody;
9use crate::generic::client::{GrpcService, IntoService};
10
11use futures::{stream, Future, Poll, Stream};
12use http::{uri, Uri};
13use prost::Message;
14
15/// gRPC client handle.
16///
17/// Takes an HTTP service and adds the gRPC protocol.
18#[derive(Debug, Clone)]
19pub struct Grpc<T> {
20    /// The inner HTTP/2.0 service.
21    inner: T,
22}
23
24/// Convert a stream of protobuf messages to an HTTP body payload.
25pub trait Encodable<T> {
26    fn into_encode(self) -> T;
27}
28
29// ===== impl Grpc =====
30
31impl<T> Grpc<T> {
32    /// Create a new `Grpc` instance backed by the given HTTP service.
33    pub fn new(inner: T) -> Self {
34        Grpc { inner }
35    }
36
37    /// Returns `Ready` when the service is ready to accept a request.
38    pub fn poll_ready<R>(&mut self) -> Poll<(), crate::Status>
39    where
40        T: GrpcService<R>,
41    {
42        self.inner
43            .poll_ready()
44            .map_err(|err| crate::Status::from_error(&*(err.into())))
45    }
46
47    /// Consumes `self`, returning a future that yields `self` back once it is ready to accept a
48    /// request.
49    pub fn ready<R>(self) -> impl Future<Item = Self, Error = crate::Status>
50    where
51        T: GrpcService<R>,
52    {
53        use tower_util::Ready;
54        Ready::new(self.inner.into_service())
55            .map(|IntoService(inner)| Grpc { inner })
56            .map_err(|err| crate::Status::from_error(&*(err.into())))
57    }
58
59    /// Send a unary gRPC request.
60    pub fn unary<M1, M2, R>(
61        &mut self,
62        request: crate::Request<M1>,
63        path: uri::PathAndQuery,
64    ) -> unary::ResponseFuture<M2, T::Future, T::ResponseBody>
65    where
66        T: GrpcService<R>,
67        unary::Once<M1>: Encodable<R>,
68    {
69        let request = request.map(|v| stream::once(Ok(v)));
70        let response = self.client_streaming(request, path);
71
72        unary::ResponseFuture::new(response)
73    }
74
75    /// Send a client streaing gRPC request.
76    pub fn client_streaming<B, M, R>(
77        &mut self,
78        request: crate::Request<B>,
79        path: uri::PathAndQuery,
80    ) -> client_streaming::ResponseFuture<M, T::Future, T::ResponseBody>
81    where
82        T: GrpcService<R>,
83        B: Encodable<R>,
84    {
85        let response = self.streaming(request, path);
86        client_streaming::ResponseFuture::new(response)
87    }
88
89    /// Send a server streaming gRPC request.
90    pub fn server_streaming<M1, M2, R>(
91        &mut self,
92        request: crate::Request<M1>,
93        path: uri::PathAndQuery,
94    ) -> server_streaming::ResponseFuture<M2, T::Future>
95    where
96        T: GrpcService<R>,
97        unary::Once<M1>: Encodable<R>,
98    {
99        let request = request.map(|v| stream::once(Ok(v)));
100        let response = self.streaming(request, path);
101
102        server_streaming::ResponseFuture::new(response)
103    }
104
105    /// Initiate a full streaming gRPC request
106    ///
107    /// # Generics
108    ///
109    /// **B**: The request stream of gRPC message values.
110    /// **M**: The response **message** (not stream) type.
111    /// **R**: The type of the request body.
112    pub fn streaming<B, M, R>(
113        &mut self,
114        request: crate::Request<B>,
115        path: uri::PathAndQuery,
116    ) -> streaming::ResponseFuture<M, T::Future>
117    where
118        T: GrpcService<R>,
119        B: Encodable<R>,
120    {
121        use http::header::{self, HeaderValue};
122
123        // TODO: validate the path
124
125        // Get the gRPC's method URI
126        let mut parts = uri::Parts::default();
127        parts.path_and_query = Some(path);
128
129        // Get the URI;
130        let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");
131
132        // Convert the request body
133        let request = request.map(Encodable::into_encode);
134
135        // Convert to an HTTP request
136        let mut request = request.into_http(uri);
137
138        // Add the gRPC related HTTP headers
139        request
140            .headers_mut()
141            .insert(header::TE, HeaderValue::from_static("trailers"));
142
143        // Set the content type
144        // TODO: Don't hard code this here
145        let content_type = "application/grpc+proto";
146        request
147            .headers_mut()
148            .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
149
150        // Call the inner HTTP service
151        let response = self.inner.call(request);
152
153        streaming::ResponseFuture::new(response)
154    }
155}
156
157// ===== impl Encodable =====
158
159impl<T, U> Encodable<BoxBody> for T
160where
161    T: Stream<Item = U, Error = crate::Status> + Send + 'static,
162    U: Message + 'static,
163{
164    fn into_encode(self) -> BoxBody {
165        use crate::codec::Encoder;
166        use crate::generic::Encode;
167
168        let encode = Encode::request(Encoder::new(), self);
169        BoxBody::new(Box::new(encode))
170    }
171}