tower_grpc/server/
streaming.rs1use crate::codec::{Encode, Encoder};
2use crate::generic::server::{streaming, StreamingService};
3
4use futures::{try_ready, Future, Poll, Stream};
5use std::fmt;
6
7pub struct ResponseFuture<T, S>
8where
9 T: StreamingService<S>,
10 S: Stream<Error = crate::Status>,
11 S::Item: prost::Message + Default,
12 T::Response: prost::Message,
13{
14 inner: Inner<T::Future, T::Response>,
15}
16
17type Inner<T, U> = streaming::ResponseFuture<T, Encoder<U>>;
18
19impl<T, S> ResponseFuture<T, S>
20where
21 T: StreamingService<S>,
22 S: Stream<Error = crate::Status>,
23 S::Item: prost::Message + Default,
24 T::Response: prost::Message,
25{
26 pub(crate) fn new(inner: Inner<T::Future, T::Response>) -> Self {
27 ResponseFuture { inner }
28 }
29}
30
31impl<T, S> Future for ResponseFuture<T, S>
32where
33 T: StreamingService<S>,
34 S: Stream<Error = crate::Status>,
35 S::Item: prost::Message + Default,
36 T::Response: prost::Message,
37{
38 type Item = http::Response<Encode<T::ResponseStream>>;
39 type Error = crate::error::Never;
40
41 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
42 let response = try_ready!(self.inner.poll());
43 let response = response.map(Encode::new);
44 Ok(response.into())
45 }
46}
47
48impl<T, S> fmt::Debug for ResponseFuture<T, S>
49where
50 T: StreamingService<S> + fmt::Debug,
51 S: Stream<Error = crate::Status> + fmt::Debug,
52 S::Item: prost::Message + Default + fmt::Debug,
53 T::Response: prost::Message + fmt::Debug,
54 T::ResponseStream: fmt::Debug,
55 T::Future: fmt::Debug,
56{
57 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
58 fmt.debug_struct("streaming::ResponseFuture")
59 .field("inner", &self.inner)
60 .finish()
61 }
62}