tower_grpc/client/
client_streaming.rs

1use super::streaming;
2use crate::codec::Streaming;
3use crate::error::Error;
4use crate::Body;
5
6use futures::{try_ready, Future, Poll, Stream};
7use http::{response, Response};
8use prost::Message;
9use std::fmt;
10
11pub struct ResponseFuture<T, U, B: Body> {
12    state: State<T, U, B>,
13}
14
15enum State<T, U, B: Body> {
16    /// Waiting for the HTTP response
17    WaitResponse(streaming::ResponseFuture<T, U>),
18    /// Waiting for the gRPC Proto message in the Response body
19    WaitMessage {
20        head: Option<response::Parts>,
21        stream: Streaming<T, B>,
22    },
23}
24
25impl<T, U, B: Body> ResponseFuture<T, U, B> {
26    /// Create a new client-streaming response future.
27    pub(super) fn new(inner: streaming::ResponseFuture<T, U>) -> Self {
28        let state = State::WaitResponse(inner);
29        ResponseFuture { state }
30    }
31}
32
33impl<T, U, B> Future for ResponseFuture<T, U, B>
34where
35    T: Message + Default,
36    U: Future<Item = Response<B>>,
37    U::Error: Into<Error>,
38    B: Body,
39    B::Error: Into<Error>,
40{
41    type Item = crate::Response<T>;
42    type Error = crate::Status;
43
44    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
45        loop {
46            let response = match self.state {
47                State::WaitResponse(ref mut inner) => try_ready!(inner.poll()),
48                State::WaitMessage {
49                    ref mut head,
50                    ref mut stream,
51                } => {
52                    let message = match try_ready!(stream.poll()) {
53                        Some(message) => message,
54                        None => {
55                            return Err(crate::Status::new(
56                                crate::Code::Internal,
57                                "Missing response message.",
58                            ));
59                        }
60                    };
61
62                    let head = head.take().unwrap();
63                    let response = Response::from_parts(head, message);
64
65                    return Ok(crate::Response::from_http(response).into());
66                }
67            };
68
69            let (head, body) = response.into_http().into_parts();
70
71            self.state = State::WaitMessage {
72                head: Some(head),
73                stream: body,
74            };
75        }
76    }
77}
78
79impl<T, U, B> fmt::Debug for ResponseFuture<T, U, B>
80where
81    T: fmt::Debug,
82    U: fmt::Debug,
83    B: Body + fmt::Debug,
84    B::Data: fmt::Debug,
85{
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        f.debug_struct("ResponseFuture")
88            .field("state", &self.state)
89            .finish()
90    }
91}
92
93impl<T, U, B> fmt::Debug for State<T, U, B>
94where
95    T: fmt::Debug,
96    U: fmt::Debug,
97    B: Body + fmt::Debug,
98    B::Data: fmt::Debug,
99{
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        match *self {
102            State::WaitResponse(ref future) => f.debug_tuple("WaitResponse").field(future).finish(),
103            State::WaitMessage {
104                ref head,
105                ref stream,
106            } => f
107                .debug_struct("WaitMessage")
108                .field("head", head)
109                .field("stream", stream)
110                .finish(),
111        }
112    }
113}