tower_grpc/client/
streaming.rs

1use crate::codec::{Direction, Streaming};
2use crate::error::Error;
3use crate::Body;
4use crate::Code;
5
6use futures::{try_ready, Future, Poll};
7use http::Response;
8use prost::Message;
9use std::marker::PhantomData;
10
11#[derive(Debug)]
12pub struct ResponseFuture<T, U> {
13    inner: U,
14    _m: PhantomData<T>,
15}
16
17impl<T, U> ResponseFuture<T, U> {
18    /// Create a new client-streaming response future.
19    pub(super) fn new(inner: U) -> Self {
20        ResponseFuture {
21            inner,
22            _m: PhantomData,
23        }
24    }
25}
26
27impl<T, U, B> Future for ResponseFuture<T, U>
28where
29    T: Message + Default,
30    U: Future<Item = Response<B>>,
31    U::Error: Into<Error>,
32    B: Body,
33{
34    type Item = crate::Response<Streaming<T, B>>;
35    type Error = crate::Status;
36
37    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
38        use crate::codec::Decoder;
39        use crate::generic::Streaming;
40
41        // Get the response
42        let response = try_ready!(self
43            .inner
44            .poll()
45            .map_err(|err| crate::Status::from_error(&*(err.into()))));
46
47        let status_code = response.status();
48
49        // Check the headers for `grpc-status`, in which case we should not parse the body.
50        let trailers_only_status = crate::Status::from_header_map(response.headers());
51        let expect_additional_trailers = trailers_only_status.is_none();
52        if let Some(status) = trailers_only_status {
53            if status.code() != Code::Ok {
54                return Err(status);
55            }
56        }
57
58        let streaming_direction = if expect_additional_trailers {
59            Direction::Response(status_code)
60        } else {
61            Direction::EmptyResponse
62        };
63
64        let response =
65            response.map(move |body| Streaming::new(Decoder::new(), body, streaming_direction));
66
67        Ok(crate::Response::from_http(response).into())
68    }
69}