tower_grpc/server/
streaming.rs

1use crate::codec::{Encode, Encoder};
2use crate::generic::server::{streaming, StreamingService};
3
4use futures::{try_ready, Future, Poll, Stream};
5use std::fmt;
6
7pub struct ResponseFuture<T, S>
8where
9    T: StreamingService<S>,
10    S: Stream<Error = crate::Status>,
11    S::Item: prost::Message + Default,
12    T::Response: prost::Message,
13{
14    inner: Inner<T::Future, T::Response>,
15}
16
17type Inner<T, U> = streaming::ResponseFuture<T, Encoder<U>>;
18
19impl<T, S> ResponseFuture<T, S>
20where
21    T: StreamingService<S>,
22    S: Stream<Error = crate::Status>,
23    S::Item: prost::Message + Default,
24    T::Response: prost::Message,
25{
26    pub(crate) fn new(inner: Inner<T::Future, T::Response>) -> Self {
27        ResponseFuture { inner }
28    }
29}
30
31impl<T, S> Future for ResponseFuture<T, S>
32where
33    T: StreamingService<S>,
34    S: Stream<Error = crate::Status>,
35    S::Item: prost::Message + Default,
36    T::Response: prost::Message,
37{
38    type Item = http::Response<Encode<T::ResponseStream>>;
39    type Error = crate::error::Never;
40
41    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
42        let response = try_ready!(self.inner.poll());
43        let response = response.map(Encode::new);
44        Ok(response.into())
45    }
46}
47
48impl<T, S> fmt::Debug for ResponseFuture<T, S>
49where
50    T: StreamingService<S> + fmt::Debug,
51    S: Stream<Error = crate::Status> + fmt::Debug,
52    S::Item: prost::Message + Default + fmt::Debug,
53    T::Response: prost::Message + fmt::Debug,
54    T::ResponseStream: fmt::Debug,
55    T::Future: fmt::Debug,
56{
57    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
58        fmt.debug_struct("streaming::ResponseFuture")
59            .field("inner", &self.inner)
60            .finish()
61    }
62}