tower_grpc/server/
client_streaming.rs

1use crate::codec::{Encode, Encoder};
2use crate::generic::server::{client_streaming, unary, ClientStreamingService};
3
4use futures::{try_ready, Future, Poll, Stream};
5use std::fmt;
6
7pub struct ResponseFuture<T, S>
8where
9    T: ClientStreamingService<S>,
10    S: Stream<Error = crate::Status>,
11{
12    inner: Inner<T::Future, T::Response>,
13}
14
15type Inner<T, U> = client_streaming::ResponseFuture<T, Encoder<U>>;
16
17impl<T, S> ResponseFuture<T, S>
18where
19    T: ClientStreamingService<S>,
20    S: Stream<Error = crate::Status>,
21    S::Item: prost::Message + Default,
22    T::Response: prost::Message,
23{
24    pub(crate) fn new(inner: Inner<T::Future, T::Response>) -> Self {
25        ResponseFuture { inner }
26    }
27}
28
29impl<T, S> Future for ResponseFuture<T, S>
30where
31    T: ClientStreamingService<S>,
32    S: Stream<Error = crate::Status>,
33    S::Item: prost::Message + Default,
34    T::Response: prost::Message,
35{
36    type Item = http::Response<Encode<unary::Once<T::Response>>>;
37    type Error = crate::error::Never;
38
39    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
40        let response = try_ready!(self.inner.poll());
41        let response = response.map(Encode::new);
42        Ok(response.into())
43    }
44}
45
46impl<T, S> fmt::Debug for ResponseFuture<T, S>
47where
48    T: ClientStreamingService<S> + fmt::Debug,
49    S: Stream<Error = crate::Status> + fmt::Debug,
50    T::Response: fmt::Debug,
51    T::Future: fmt::Debug,
52{
53    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
54        fmt.debug_struct("client_streaming::ResponseFuture")
55            .field("inner", &self.inner)
56            .finish()
57    }
58}