grpc/client/
http_response_to_grpc_frames.rs

1///! Convert HTTP response stream to gRPC stream
2use std::collections::VecDeque;
3
4use futures::future::TryFutureExt;
5use futures::stream::Stream;
6
7use httpbis::Headers;
8
9use bytes::Bytes;
10
11use crate::result;
12
13use crate::error::Error;
14use crate::error::GrpcMessageError;
15
16use crate::proto::grpc_frame_parser::GrpcFrameParser;
17use crate::proto::grpc_status::GrpcStatus;
18use crate::proto::headers::HEADER_GRPC_MESSAGE;
19use crate::proto::headers::HEADER_GRPC_STATUS;
20use crate::proto::metadata::Metadata;
21use crate::resp::*;
22use crate::stream_item::*;
23use futures::task::Context;
24use httpbis::DataOrTrailers;
25use httpbis::HttpStreamAfterHeaders;
26use std::pin::Pin;
27use std::task::Poll;
28
29fn init_headers_to_metadata(headers: Headers) -> result::Result<Metadata> {
30    if headers.get_opt(":status") != Some("200") {
31        return Err(Error::Other("not 200"));
32    }
33
34    // Check gRPC status code and message
35    // TODO: a more detailed error message.
36    if let Some(grpc_status) = headers.get_opt_parse(HEADER_GRPC_STATUS) {
37        if grpc_status != GrpcStatus::Ok as i32 {
38            let message = headers
39                .get_opt(HEADER_GRPC_MESSAGE)
40                .unwrap_or("unknown error");
41            return Err(Error::GrpcMessage(GrpcMessageError {
42                grpc_status: grpc_status,
43                grpc_message: message.to_owned(),
44            }));
45        }
46    }
47
48    Ok(Metadata::from_headers(headers)?)
49}
50
51pub fn http_response_to_grpc_frames(response: httpbis::Response) -> StreamingResponse<Bytes> {
52    StreamingResponse::new(response.0.map_err(|e| crate::Error::from(e)).and_then(
53        |(headers, rem)| async {
54            let metadata = init_headers_to_metadata(headers)?;
55            let frames: GrpcStreamWithTrailingMetadata<Bytes> = GrpcStreamWithTrailingMetadata::new(
56                GrpcFrameFromHttpFramesStreamResponse::new(rem),
57            );
58            Ok((metadata, frames))
59        },
60    ))
61}
62
63struct GrpcFrameFromHttpFramesStreamResponse {
64    http_stream_stream: HttpStreamAfterHeaders,
65    buf: GrpcFrameParser,
66    parsed_frames: VecDeque<Bytes>,
67}
68
69impl GrpcFrameFromHttpFramesStreamResponse {
70    pub fn new(http_stream_stream: HttpStreamAfterHeaders) -> Self {
71        GrpcFrameFromHttpFramesStreamResponse {
72            http_stream_stream,
73            buf: GrpcFrameParser::default(),
74            parsed_frames: VecDeque::new(),
75        }
76    }
77}
78
79impl Stream for GrpcFrameFromHttpFramesStreamResponse {
80    type Item = crate::Result<ItemOrMetadata<Bytes>>;
81
82    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83        loop {
84            let frames = self.buf.next_frames()?.0;
85            self.parsed_frames.extend(frames);
86
87            if let Some(frame) = self.parsed_frames.pop_front() {
88                return Poll::Ready(Some(Ok(ItemOrMetadata::Item(frame))));
89            }
90
91            let part_opt =
92                match unsafe { Pin::new_unchecked(&mut self.http_stream_stream) }.poll_next(cx)? {
93                    Poll::Pending => return Poll::Pending,
94                    Poll::Ready(part_opt) => part_opt,
95                };
96            let part = match part_opt {
97                None => {
98                    self.buf.check_empty()?;
99                    return Poll::Ready(None);
100                }
101                Some(part) => part,
102            };
103
104            match part {
105                DataOrTrailers::Trailers(headers) => {
106                    self.buf.check_empty()?;
107                    let grpc_status = headers.get_opt_parse(HEADER_GRPC_STATUS);
108                    if grpc_status == Some(GrpcStatus::Ok as i32) {
109                        return Poll::Ready(Some(Ok(ItemOrMetadata::TrailingMetadata(
110                            Metadata::from_headers(headers)?,
111                        ))));
112                    } else {
113                        return Poll::Ready(Some(Err(
114                            if let Some(message) = headers.get_opt(HEADER_GRPC_MESSAGE) {
115                                Error::GrpcMessage(GrpcMessageError {
116                                    grpc_status: grpc_status.unwrap_or(GrpcStatus::Unknown as i32),
117                                    grpc_message: message.to_owned(),
118                                })
119                            } else {
120                                Error::Other("not xxx")
121                            },
122                        )));
123                    }
124                }
125                DataOrTrailers::Data(data, ..) => {
126                    self.buf.enqueue(data);
127                }
128            }
129        }
130    }
131}