xitca_web/handler/types/
grpc.rs1use http_grpc_rs::stream::RequestStream;
23use prost::Message;
24
25pub use http_grpc_rs::{
26 codec::{Codec, DEFAULT_LIMIT},
27 error::{GrpcError, ProtocolError},
28 status::GrpcStatus,
29 stream::ResponseBody as GrpcStreamResponse,
30};
31
32use crate::{
33 body::{BodyStream, RequestBody, ResponseBody},
34 context::WebContext,
35 error::Error,
36 handler::{FromRequest, Responder},
37 http::WebResponse,
38};
39
40pub struct Grpc<T, const LIMIT: usize = DEFAULT_LIMIT>(pub T);
50
51impl<'a, 'r, C, B, T, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for Grpc<T, LIMIT>
52where
53 B: BodyStream + Default + Unpin + 'static,
54 T: Message + Default,
55{
56 type Type<'b> = Grpc<T, LIMIT>;
57 type Error = Error;
58
59 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
60 let mut stream = from_ctx::<_, _, LIMIT>(ctx)?;
61 let msg = stream
62 .message()
63 .await
64 .map_err(Error::from_service)?
65 .ok_or_else(|| Error::from_service(GrpcError::new(GrpcStatus::Internal, "empty grpc request body")))?;
66 Ok(Grpc(msg))
67 }
68}
69
70impl<'r, C, B, T, const LIMIT: usize> Responder<WebContext<'r, C, B>> for Grpc<T, LIMIT>
71where
72 T: Message + Unpin + 'static,
73{
74 type Response = WebResponse;
75 type Error = Error;
76
77 async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
78 GrpcStreamResponse::once(self.0).respond(ctx).await
79 }
80}
81
82pub type GrpcStreamRequest = RequestStream<RequestBody>;
102
103fn from_ctx<C, B, const LIMIT: usize>(ctx: &WebContext<'_, C, B>) -> Result<GrpcStreamRequest, Error>
104where
105 B: BodyStream + Default + 'static,
106{
107 let body = ctx.take_body_ref();
108 let body = crate::body::downcast_body(body);
109
110 let mut stream = RequestStream::new(ctx.req().headers(), body);
111 stream.codec_mut().set_limit(LIMIT);
112 Ok(stream)
113}
114
115impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for GrpcStreamRequest
116where
117 B: BodyStream + Default + 'static,
118{
119 type Type<'b> = GrpcStreamRequest;
120 type Error = Error;
121
122 async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
123 from_ctx::<_, _, DEFAULT_LIMIT>(ctx)
124 }
125}
126
127pub use http_grpc_rs::stream::ResponseSender as GrpcStreamSender;
129
130impl<'r, C, B, T> Responder<WebContext<'r, C, B>> for http_grpc_rs::stream::ResponseBody<T>
131where
132 T: Message + Unpin + 'static,
133{
134 type Response = WebResponse;
135 type Error = Error;
136
137 async fn respond(self, mut ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
138 let req = core::mem::take(ctx.req_mut()).map(|_| ());
139 let res = self.into_response(req);
140 Ok(res.map(ResponseBody::boxed))
141 }
142
143 fn map(self, _: Self::Response) -> Result<Self::Response, Self::Error>
144 where
145 Self: Sized,
146 {
147 panic!(
148 "GrpcStreamResponse must be the first item from a series of Responder type. It needs WebContext for content encoding"
149 )
150 }
151}