tower_grpc/generic/server/
unary.rs

1use super::server_streaming;
2use crate::generic::server::UnaryService;
3use crate::generic::{Encode, Encoder};
4use crate::{Request, Response};
5
6use futures::{try_ready, Future, Poll, Stream};
7use std::fmt;
8use tower_service::Service;
9
10pub struct ResponseFuture<T, E, S>
11where
12    T: UnaryService<S::Item>,
13    S: Stream,
14{
15    inner: server_streaming::ResponseFuture<Inner<T>, E, S>,
16}
17
18// TODO: Use type in futures-rs instead
19#[derive(Debug)]
20pub struct Once<T> {
21    inner: Option<T>,
22}
23
24/// Maps inbound requests
25#[derive(Debug, Clone)]
26struct Inner<T>(pub T);
27
28#[derive(Debug)]
29struct InnerFuture<T>(T);
30
31// ===== impl ResponseFuture ======
32
33impl<T, E, S> ResponseFuture<T, E, S>
34where
35    T: UnaryService<S::Item, Response = E::Item>,
36    E: Encoder,
37    S: Stream<Error = crate::Status>,
38{
39    pub fn new(inner: T, request: Request<S>, encoder: E) -> Self {
40        let inner = server_streaming::ResponseFuture::new(Inner(inner), request, encoder);
41        ResponseFuture { inner }
42    }
43}
44
45impl<T, E, S> Future for ResponseFuture<T, E, S>
46where
47    T: UnaryService<S::Item, Response = E::Item>,
48    E: Encoder,
49    S: Stream<Error = crate::Status>,
50{
51    type Item = http::Response<Encode<E, Once<T::Response>>>;
52    type Error = crate::error::Never;
53
54    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
55        self.inner.poll()
56    }
57}
58
59// ===== impl Inner =====
60
61impl<T, R> Service<Request<R>> for Inner<T>
62where
63    T: UnaryService<R>,
64{
65    type Response = Response<Once<T::Response>>;
66    type Error = crate::Status;
67    type Future = InnerFuture<T::Future>;
68
69    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
70        Ok(().into())
71    }
72
73    fn call(&mut self, request: Request<R>) -> Self::Future {
74        let inner = self.0.call(request);
75        InnerFuture(inner)
76    }
77}
78
79// ===== impl InnerFuture ======
80
81impl<T, U> Future for InnerFuture<T>
82where
83    T: Future<Item = Response<U>, Error = crate::Status>,
84{
85    type Item = Response<Once<U>>;
86    type Error = crate::Status;
87
88    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
89        let response = try_ready!(self.0.poll());
90        Ok(Once::map(response).into())
91    }
92}
93
94// ===== impl Once =====
95
96impl<T> Once<T> {
97    /// Map a response to a response of a `Once` stream
98    pub(super) fn map(response: Response<T>) -> Response<Self> {
99        response.map(|body| Once { inner: Some(body) })
100    }
101}
102
103impl<T> Stream for Once<T> {
104    type Item = T;
105    type Error = crate::Status;
106
107    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
108        Ok(self.inner.take().into())
109    }
110}
111
112impl<T, E, S> fmt::Debug for ResponseFuture<T, E, S>
113where
114    T: UnaryService<S::Item> + fmt::Debug,
115    T::Response: fmt::Debug,
116    T::Future: fmt::Debug,
117    E: fmt::Debug,
118    S: Stream + fmt::Debug,
119{
120    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
121        fmt.debug_struct("unary::ResponseFuture")
122            .field("inner", &self.inner)
123            .finish()
124    }
125}