tower_grpc/server/
server_streaming.rs

1use crate::codec::{Encode, Encoder, Streaming};
2use crate::generic::server::{server_streaming, ServerStreamingService};
3use crate::Body;
4
5use futures::{try_ready, Future, Poll};
6use std::fmt;
7
8pub struct ResponseFuture<T, B, R>
9where
10    T: ServerStreamingService<R>,
11    B: Body,
12    R: prost::Message + Default,
13{
14    inner: Inner<T, T::Response, R, B>,
15}
16
17type Inner<T, U, V, B> = server_streaming::ResponseFuture<T, Encoder<U>, Streaming<V, B>>;
18
19impl<T, B, R> ResponseFuture<T, B, R>
20where
21    T: ServerStreamingService<R>,
22    R: prost::Message + Default,
23    T::Response: prost::Message,
24    B: Body,
25{
26    pub(crate) fn new(inner: Inner<T, T::Response, R, B>) -> Self {
27        ResponseFuture { inner }
28    }
29}
30
31impl<T, B, R> Future for ResponseFuture<T, B, R>
32where
33    T: ServerStreamingService<R>,
34    R: prost::Message + Default,
35    T::Response: prost::Message,
36    B: Body,
37{
38    type Item = http::Response<Encode<T::ResponseStream>>;
39    type Error = crate::error::Never;
40
41    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
42        let response = try_ready!(self.inner.poll());
43        let response = response.map(Encode::new);
44        Ok(response.into())
45    }
46}
47
48impl<T, B, R> fmt::Debug for ResponseFuture<T, B, R>
49where
50    T: ServerStreamingService<R> + fmt::Debug,
51    T::Response: fmt::Debug,
52    T::ResponseStream: fmt::Debug,
53    T::Future: fmt::Debug,
54    B: Body + fmt::Debug,
55    B::Data: fmt::Debug,
56    R: prost::Message + Default,
57{
58    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59        fmt.debug_struct("server_streaming::ResponseFuture")
60            .field("inner", &self.inner)
61            .finish()
62    }
63}