tower_grpc/server/
client_streaming.rs1use crate::codec::{Encode, Encoder};
2use crate::generic::server::{client_streaming, unary, ClientStreamingService};
3
4use futures::{try_ready, Future, Poll, Stream};
5use std::fmt;
6
7pub struct ResponseFuture<T, S>
8where
9 T: ClientStreamingService<S>,
10 S: Stream<Error = crate::Status>,
11{
12 inner: Inner<T::Future, T::Response>,
13}
14
15type Inner<T, U> = client_streaming::ResponseFuture<T, Encoder<U>>;
16
17impl<T, S> ResponseFuture<T, S>
18where
19 T: ClientStreamingService<S>,
20 S: Stream<Error = crate::Status>,
21 S::Item: prost::Message + Default,
22 T::Response: prost::Message,
23{
24 pub(crate) fn new(inner: Inner<T::Future, T::Response>) -> Self {
25 ResponseFuture { inner }
26 }
27}
28
29impl<T, S> Future for ResponseFuture<T, S>
30where
31 T: ClientStreamingService<S>,
32 S: Stream<Error = crate::Status>,
33 S::Item: prost::Message + Default,
34 T::Response: prost::Message,
35{
36 type Item = http::Response<Encode<unary::Once<T::Response>>>;
37 type Error = crate::error::Never;
38
39 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
40 let response = try_ready!(self.inner.poll());
41 let response = response.map(Encode::new);
42 Ok(response.into())
43 }
44}
45
46impl<T, S> fmt::Debug for ResponseFuture<T, S>
47where
48 T: ClientStreamingService<S> + fmt::Debug,
49 S: Stream<Error = crate::Status> + fmt::Debug,
50 T::Response: fmt::Debug,
51 T::Future: fmt::Debug,
52{
53 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
54 fmt.debug_struct("client_streaming::ResponseFuture")
55 .field("inner", &self.inner)
56 .finish()
57 }
58}