tower_grpc/generic/server/
unary.rs1use super::server_streaming;
2use crate::generic::server::UnaryService;
3use crate::generic::{Encode, Encoder};
4use crate::{Request, Response};
5
6use futures::{try_ready, Future, Poll, Stream};
7use std::fmt;
8use tower_service::Service;
9
10pub struct ResponseFuture<T, E, S>
11where
12 T: UnaryService<S::Item>,
13 S: Stream,
14{
15 inner: server_streaming::ResponseFuture<Inner<T>, E, S>,
16}
17
18#[derive(Debug)]
20pub struct Once<T> {
21 inner: Option<T>,
22}
23
24#[derive(Debug, Clone)]
26struct Inner<T>(pub T);
27
28#[derive(Debug)]
29struct InnerFuture<T>(T);
30
31impl<T, E, S> ResponseFuture<T, E, S>
34where
35 T: UnaryService<S::Item, Response = E::Item>,
36 E: Encoder,
37 S: Stream<Error = crate::Status>,
38{
39 pub fn new(inner: T, request: Request<S>, encoder: E) -> Self {
40 let inner = server_streaming::ResponseFuture::new(Inner(inner), request, encoder);
41 ResponseFuture { inner }
42 }
43}
44
45impl<T, E, S> Future for ResponseFuture<T, E, S>
46where
47 T: UnaryService<S::Item, Response = E::Item>,
48 E: Encoder,
49 S: Stream<Error = crate::Status>,
50{
51 type Item = http::Response<Encode<E, Once<T::Response>>>;
52 type Error = crate::error::Never;
53
54 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
55 self.inner.poll()
56 }
57}
58
59impl<T, R> Service<Request<R>> for Inner<T>
62where
63 T: UnaryService<R>,
64{
65 type Response = Response<Once<T::Response>>;
66 type Error = crate::Status;
67 type Future = InnerFuture<T::Future>;
68
69 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
70 Ok(().into())
71 }
72
73 fn call(&mut self, request: Request<R>) -> Self::Future {
74 let inner = self.0.call(request);
75 InnerFuture(inner)
76 }
77}
78
79impl<T, U> Future for InnerFuture<T>
82where
83 T: Future<Item = Response<U>, Error = crate::Status>,
84{
85 type Item = Response<Once<U>>;
86 type Error = crate::Status;
87
88 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
89 let response = try_ready!(self.0.poll());
90 Ok(Once::map(response).into())
91 }
92}
93
94impl<T> Once<T> {
97 pub(super) fn map(response: Response<T>) -> Response<Self> {
99 response.map(|body| Once { inner: Some(body) })
100 }
101}
102
103impl<T> Stream for Once<T> {
104 type Item = T;
105 type Error = crate::Status;
106
107 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
108 Ok(self.inner.take().into())
109 }
110}
111
112impl<T, E, S> fmt::Debug for ResponseFuture<T, E, S>
113where
114 T: UnaryService<S::Item> + fmt::Debug,
115 T::Response: fmt::Debug,
116 T::Future: fmt::Debug,
117 E: fmt::Debug,
118 S: Stream + fmt::Debug,
119{
120 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
121 fmt.debug_struct("unary::ResponseFuture")
122 .field("inner", &self.inner)
123 .finish()
124 }
125}