1pub mod client_streaming;
4pub mod server_streaming;
5pub mod streaming;
6pub mod unary;
7
8use crate::body::BoxBody;
9use crate::generic::client::{GrpcService, IntoService};
10
11use futures::{stream, Future, Poll, Stream};
12use http::{uri, Uri};
13use prost::Message;
14
15#[derive(Debug, Clone)]
19pub struct Grpc<T> {
20 inner: T,
22}
23
24pub trait Encodable<T> {
26 fn into_encode(self) -> T;
27}
28
29impl<T> Grpc<T> {
32 pub fn new(inner: T) -> Self {
34 Grpc { inner }
35 }
36
37 pub fn poll_ready<R>(&mut self) -> Poll<(), crate::Status>
39 where
40 T: GrpcService<R>,
41 {
42 self.inner
43 .poll_ready()
44 .map_err(|err| crate::Status::from_error(&*(err.into())))
45 }
46
47 pub fn ready<R>(self) -> impl Future<Item = Self, Error = crate::Status>
50 where
51 T: GrpcService<R>,
52 {
53 use tower_util::Ready;
54 Ready::new(self.inner.into_service())
55 .map(|IntoService(inner)| Grpc { inner })
56 .map_err(|err| crate::Status::from_error(&*(err.into())))
57 }
58
59 pub fn unary<M1, M2, R>(
61 &mut self,
62 request: crate::Request<M1>,
63 path: uri::PathAndQuery,
64 ) -> unary::ResponseFuture<M2, T::Future, T::ResponseBody>
65 where
66 T: GrpcService<R>,
67 unary::Once<M1>: Encodable<R>,
68 {
69 let request = request.map(|v| stream::once(Ok(v)));
70 let response = self.client_streaming(request, path);
71
72 unary::ResponseFuture::new(response)
73 }
74
75 pub fn client_streaming<B, M, R>(
77 &mut self,
78 request: crate::Request<B>,
79 path: uri::PathAndQuery,
80 ) -> client_streaming::ResponseFuture<M, T::Future, T::ResponseBody>
81 where
82 T: GrpcService<R>,
83 B: Encodable<R>,
84 {
85 let response = self.streaming(request, path);
86 client_streaming::ResponseFuture::new(response)
87 }
88
89 pub fn server_streaming<M1, M2, R>(
91 &mut self,
92 request: crate::Request<M1>,
93 path: uri::PathAndQuery,
94 ) -> server_streaming::ResponseFuture<M2, T::Future>
95 where
96 T: GrpcService<R>,
97 unary::Once<M1>: Encodable<R>,
98 {
99 let request = request.map(|v| stream::once(Ok(v)));
100 let response = self.streaming(request, path);
101
102 server_streaming::ResponseFuture::new(response)
103 }
104
105 pub fn streaming<B, M, R>(
113 &mut self,
114 request: crate::Request<B>,
115 path: uri::PathAndQuery,
116 ) -> streaming::ResponseFuture<M, T::Future>
117 where
118 T: GrpcService<R>,
119 B: Encodable<R>,
120 {
121 use http::header::{self, HeaderValue};
122
123 let mut parts = uri::Parts::default();
127 parts.path_and_query = Some(path);
128
129 let uri = Uri::from_parts(parts).expect("path_and_query only is valid Uri");
131
132 let request = request.map(Encodable::into_encode);
134
135 let mut request = request.into_http(uri);
137
138 request
140 .headers_mut()
141 .insert(header::TE, HeaderValue::from_static("trailers"));
142
143 let content_type = "application/grpc+proto";
146 request
147 .headers_mut()
148 .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
149
150 let response = self.inner.call(request);
152
153 streaming::ResponseFuture::new(response)
154 }
155}
156
157impl<T, U> Encodable<BoxBody> for T
160where
161 T: Stream<Item = U, Error = crate::Status> + Send + 'static,
162 U: Message + 'static,
163{
164 fn into_encode(self) -> BoxBody {
165 use crate::codec::Encoder;
166 use crate::generic::Encode;
167
168 let encode = Encode::request(Encoder::new(), self);
169 BoxBody::new(Box::new(encode))
170 }
171}