tower_grpc/client/
client_streaming.rs1use super::streaming;
2use crate::codec::Streaming;
3use crate::error::Error;
4use crate::Body;
5
6use futures::{try_ready, Future, Poll, Stream};
7use http::{response, Response};
8use prost::Message;
9use std::fmt;
10
11pub struct ResponseFuture<T, U, B: Body> {
12 state: State<T, U, B>,
13}
14
15enum State<T, U, B: Body> {
16 WaitResponse(streaming::ResponseFuture<T, U>),
18 WaitMessage {
20 head: Option<response::Parts>,
21 stream: Streaming<T, B>,
22 },
23}
24
25impl<T, U, B: Body> ResponseFuture<T, U, B> {
26 pub(super) fn new(inner: streaming::ResponseFuture<T, U>) -> Self {
28 let state = State::WaitResponse(inner);
29 ResponseFuture { state }
30 }
31}
32
33impl<T, U, B> Future for ResponseFuture<T, U, B>
34where
35 T: Message + Default,
36 U: Future<Item = Response<B>>,
37 U::Error: Into<Error>,
38 B: Body,
39 B::Error: Into<Error>,
40{
41 type Item = crate::Response<T>;
42 type Error = crate::Status;
43
44 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
45 loop {
46 let response = match self.state {
47 State::WaitResponse(ref mut inner) => try_ready!(inner.poll()),
48 State::WaitMessage {
49 ref mut head,
50 ref mut stream,
51 } => {
52 let message = match try_ready!(stream.poll()) {
53 Some(message) => message,
54 None => {
55 return Err(crate::Status::new(
56 crate::Code::Internal,
57 "Missing response message.",
58 ));
59 }
60 };
61
62 let head = head.take().unwrap();
63 let response = Response::from_parts(head, message);
64
65 return Ok(crate::Response::from_http(response).into());
66 }
67 };
68
69 let (head, body) = response.into_http().into_parts();
70
71 self.state = State::WaitMessage {
72 head: Some(head),
73 stream: body,
74 };
75 }
76 }
77}
78
79impl<T, U, B> fmt::Debug for ResponseFuture<T, U, B>
80where
81 T: fmt::Debug,
82 U: fmt::Debug,
83 B: Body + fmt::Debug,
84 B::Data: fmt::Debug,
85{
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 f.debug_struct("ResponseFuture")
88 .field("state", &self.state)
89 .finish()
90 }
91}
92
93impl<T, U, B> fmt::Debug for State<T, U, B>
94where
95 T: fmt::Debug,
96 U: fmt::Debug,
97 B: Body + fmt::Debug,
98 B::Data: fmt::Debug,
99{
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 match *self {
102 State::WaitResponse(ref future) => f.debug_tuple("WaitResponse").field(future).finish(),
103 State::WaitMessage {
104 ref head,
105 ref stream,
106 } => f
107 .debug_struct("WaitMessage")
108 .field("head", head)
109 .field("stream", stream)
110 .finish(),
111 }
112 }
113}