grpc/client/
http_response_to_grpc_frames.rs1use std::collections::VecDeque;
3
4use futures::future::TryFutureExt;
5use futures::stream::Stream;
6
7use httpbis::Headers;
8
9use bytes::Bytes;
10
11use crate::result;
12
13use crate::error::Error;
14use crate::error::GrpcMessageError;
15
16use crate::proto::grpc_frame_parser::GrpcFrameParser;
17use crate::proto::grpc_status::GrpcStatus;
18use crate::proto::headers::HEADER_GRPC_MESSAGE;
19use crate::proto::headers::HEADER_GRPC_STATUS;
20use crate::proto::metadata::Metadata;
21use crate::resp::*;
22use crate::stream_item::*;
23use futures::task::Context;
24use httpbis::DataOrTrailers;
25use httpbis::HttpStreamAfterHeaders;
26use std::pin::Pin;
27use std::task::Poll;
28
29fn init_headers_to_metadata(headers: Headers) -> result::Result<Metadata> {
30 if headers.get_opt(":status") != Some("200") {
31 return Err(Error::Other("not 200"));
32 }
33
34 if let Some(grpc_status) = headers.get_opt_parse(HEADER_GRPC_STATUS) {
37 if grpc_status != GrpcStatus::Ok as i32 {
38 let message = headers
39 .get_opt(HEADER_GRPC_MESSAGE)
40 .unwrap_or("unknown error");
41 return Err(Error::GrpcMessage(GrpcMessageError {
42 grpc_status: grpc_status,
43 grpc_message: message.to_owned(),
44 }));
45 }
46 }
47
48 Ok(Metadata::from_headers(headers)?)
49}
50
51pub fn http_response_to_grpc_frames(response: httpbis::Response) -> StreamingResponse<Bytes> {
52 StreamingResponse::new(response.0.map_err(|e| crate::Error::from(e)).and_then(
53 |(headers, rem)| async {
54 let metadata = init_headers_to_metadata(headers)?;
55 let frames: GrpcStreamWithTrailingMetadata<Bytes> = GrpcStreamWithTrailingMetadata::new(
56 GrpcFrameFromHttpFramesStreamResponse::new(rem),
57 );
58 Ok((metadata, frames))
59 },
60 ))
61}
62
63struct GrpcFrameFromHttpFramesStreamResponse {
64 http_stream_stream: HttpStreamAfterHeaders,
65 buf: GrpcFrameParser,
66 parsed_frames: VecDeque<Bytes>,
67}
68
69impl GrpcFrameFromHttpFramesStreamResponse {
70 pub fn new(http_stream_stream: HttpStreamAfterHeaders) -> Self {
71 GrpcFrameFromHttpFramesStreamResponse {
72 http_stream_stream,
73 buf: GrpcFrameParser::default(),
74 parsed_frames: VecDeque::new(),
75 }
76 }
77}
78
79impl Stream for GrpcFrameFromHttpFramesStreamResponse {
80 type Item = crate::Result<ItemOrMetadata<Bytes>>;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 loop {
84 let frames = self.buf.next_frames()?.0;
85 self.parsed_frames.extend(frames);
86
87 if let Some(frame) = self.parsed_frames.pop_front() {
88 return Poll::Ready(Some(Ok(ItemOrMetadata::Item(frame))));
89 }
90
91 let part_opt =
92 match unsafe { Pin::new_unchecked(&mut self.http_stream_stream) }.poll_next(cx)? {
93 Poll::Pending => return Poll::Pending,
94 Poll::Ready(part_opt) => part_opt,
95 };
96 let part = match part_opt {
97 None => {
98 self.buf.check_empty()?;
99 return Poll::Ready(None);
100 }
101 Some(part) => part,
102 };
103
104 match part {
105 DataOrTrailers::Trailers(headers) => {
106 self.buf.check_empty()?;
107 let grpc_status = headers.get_opt_parse(HEADER_GRPC_STATUS);
108 if grpc_status == Some(GrpcStatus::Ok as i32) {
109 return Poll::Ready(Some(Ok(ItemOrMetadata::TrailingMetadata(
110 Metadata::from_headers(headers)?,
111 ))));
112 } else {
113 return Poll::Ready(Some(Err(
114 if let Some(message) = headers.get_opt(HEADER_GRPC_MESSAGE) {
115 Error::GrpcMessage(GrpcMessageError {
116 grpc_status: grpc_status.unwrap_or(GrpcStatus::Unknown as i32),
117 grpc_message: message.to_owned(),
118 })
119 } else {
120 Error::Other("not xxx")
121 },
122 )));
123 }
124 }
125 DataOrTrailers::Data(data, ..) => {
126 self.buf.enqueue(data);
127 }
128 }
129 }
130 }
131}