tower_grpc/client/
streaming.rs1use crate::codec::{Direction, Streaming};
2use crate::error::Error;
3use crate::Body;
4use crate::Code;
5
6use futures::{try_ready, Future, Poll};
7use http::Response;
8use prost::Message;
9use std::marker::PhantomData;
10
11#[derive(Debug)]
12pub struct ResponseFuture<T, U> {
13 inner: U,
14 _m: PhantomData<T>,
15}
16
17impl<T, U> ResponseFuture<T, U> {
18 pub(super) fn new(inner: U) -> Self {
20 ResponseFuture {
21 inner,
22 _m: PhantomData,
23 }
24 }
25}
26
27impl<T, U, B> Future for ResponseFuture<T, U>
28where
29 T: Message + Default,
30 U: Future<Item = Response<B>>,
31 U::Error: Into<Error>,
32 B: Body,
33{
34 type Item = crate::Response<Streaming<T, B>>;
35 type Error = crate::Status;
36
37 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
38 use crate::codec::Decoder;
39 use crate::generic::Streaming;
40
41 let response = try_ready!(self
43 .inner
44 .poll()
45 .map_err(|err| crate::Status::from_error(&*(err.into()))));
46
47 let status_code = response.status();
48
49 let trailers_only_status = crate::Status::from_header_map(response.headers());
51 let expect_additional_trailers = trailers_only_status.is_none();
52 if let Some(status) = trailers_only_status {
53 if status.code() != Code::Ok {
54 return Err(status);
55 }
56 }
57
58 let streaming_direction = if expect_additional_trailers {
59 Direction::Response(status_code)
60 } else {
61 Direction::EmptyResponse
62 };
63
64 let response =
65 response.map(move |body| Streaming::new(Decoder::new(), body, streaming_direction));
66
67 Ok(crate::Response::from_http(response).into())
68 }
69}