Skip to main content

xitca_web/handler/types/
grpc.rs

1//! type extractor and response generator for grpc unary messages.
2//!
3//! # Extracting and responding
4//! ```ignore
5//! use xitca_web::handler::grpc::Grpc;
6//!
7//! async fn say_hello(Grpc(req): Grpc<HelloRequest>) -> Grpc<HelloReply> {
8//!     Grpc(HelloReply { response: req.request })
9//! }
10//! ```
11//!
12//! # Returning errors
13//! ```ignore
14//! use xitca_web::handler::grpc::{Grpc, GrpcError, GrpcStatus};
15//!
16//! async fn say_hello(Grpc(req): Grpc<HelloRequest>) -> Result<Grpc<HelloReply>, GrpcError> {
17//!     let name = req.request.ok_or_else(|| GrpcError::new(GrpcStatus::InvalidArgument, "missing request"))?;
18//!     Ok(Grpc(HelloReply { response: Some(name) }))
19//! }
20//! ```
21
22use http_grpc_rs::stream::RequestStream;
23use prost::Message;
24
25pub use http_grpc_rs::{
26    codec::{Codec, DEFAULT_LIMIT},
27    error::{GrpcError, ProtocolError},
28    status::GrpcStatus,
29    stream::ResponseBody as GrpcStreamResponse,
30};
31
32use crate::{
33    body::{BodyStream, RequestBody, ResponseBody},
34    context::WebContext,
35    error::Error,
36    handler::{FromRequest, Responder},
37    http::WebResponse,
38};
39
40/// Extract and response type for unary gRPC messages.
41///
42/// As an extractor, decodes the gRPC length-prefixed wire format (1 byte compression flag +
43/// 4 byte big-endian length + protobuf payload) from the request body.
44///
45/// As a responder, encodes the protobuf message with gRPC framing and appends `grpc-status: 0`
46/// trailers.
47///
48/// Const generic `LIMIT` controls the maximum request body size in bytes. Default is [`DEFAULT_LIMIT`].
49pub struct Grpc<T, const LIMIT: usize = DEFAULT_LIMIT>(pub T);
50
51impl<'a, 'r, C, B, T, const LIMIT: usize> FromRequest<'a, WebContext<'r, C, B>> for Grpc<T, LIMIT>
52where
53    B: BodyStream + Default + Unpin + 'static,
54    T: Message + Default,
55{
56    type Type<'b> = Grpc<T, LIMIT>;
57    type Error = Error;
58
59    async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
60        let mut stream = from_ctx::<_, _, LIMIT>(ctx)?;
61        let msg = stream
62            .message()
63            .await
64            .map_err(Error::from_service)?
65            .ok_or_else(|| Error::from_service(GrpcError::new(GrpcStatus::Internal, "empty grpc request body")))?;
66        Ok(Grpc(msg))
67    }
68}
69
70impl<'r, C, B, T, const LIMIT: usize> Responder<WebContext<'r, C, B>> for Grpc<T, LIMIT>
71where
72    T: Message + Unpin + 'static,
73{
74    type Response = WebResponse;
75    type Error = Error;
76
77    async fn respond(self, ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
78        GrpcStreamResponse::once(self.0).respond(ctx).await
79    }
80}
81
82/// Client-streaming extractor that yields a stream of `T: Message` from the request body.
83///
84/// Wraps [`http_grpc::stream::RequestStream`] with the framework's `RequestBody` type.
85///
86/// `LIMIT` controls the maximum allowed size in bytes for a single gRPC message frame.
87/// Defaults to [`DEFAULT_LIMIT`] (4 MiB). Set to `0` for unlimited.
88///
89/// # Example
90/// ```ignore
91/// use xitca_web::handler::grpc::{GrpcStreamRequest, Grpc};
92///
93/// // default 4 MiB limit
94/// async fn client_stream(mut stream: GrpcStreamRequest<MyRequest>) -> Grpc<MyReply> {
95///     while let Some(msg) = stream.message().await? {
96///         // process each MyRequest
97///     }
98///     Grpc(MyReply { .. })
99/// }
100/// ```
101pub type GrpcStreamRequest = RequestStream<RequestBody>;
102
103fn from_ctx<C, B, const LIMIT: usize>(ctx: &WebContext<'_, C, B>) -> Result<GrpcStreamRequest, Error>
104where
105    B: BodyStream + Default + 'static,
106{
107    let body = ctx.take_body_ref();
108    let body = crate::body::downcast_body(body);
109
110    let mut stream = RequestStream::new(ctx.req().headers(), body);
111    stream.codec_mut().set_limit(LIMIT);
112    Ok(stream)
113}
114
115impl<'a, 'r, C, B> FromRequest<'a, WebContext<'r, C, B>> for GrpcStreamRequest
116where
117    B: BodyStream + Default + 'static,
118{
119    type Type<'b> = GrpcStreamRequest;
120    type Error = Error;
121
122    async fn from_request(ctx: &'a WebContext<'r, C, B>) -> Result<Self, Self::Error> {
123        from_ctx::<_, _, DEFAULT_LIMIT>(ctx)
124    }
125}
126
127/// Send gRPC messages to a [`GrpcStreamResponse`].
128pub use http_grpc_rs::stream::ResponseSender as GrpcStreamSender;
129
130impl<'r, C, B, T> Responder<WebContext<'r, C, B>> for http_grpc_rs::stream::ResponseBody<T>
131where
132    T: Message + Unpin + 'static,
133{
134    type Response = WebResponse;
135    type Error = Error;
136
137    async fn respond(self, mut ctx: WebContext<'r, C, B>) -> Result<Self::Response, Self::Error> {
138        let req = core::mem::take(ctx.req_mut()).map(|_| ());
139        let res = self.into_response(req);
140        Ok(res.map(ResponseBody::boxed))
141    }
142
143    fn map(self, _: Self::Response) -> Result<Self::Response, Self::Error>
144    where
145        Self: Sized,
146    {
147        panic!(
148            "GrpcStreamResponse must be the first item from a series of Responder type. It needs WebContext for content encoding"
149        )
150    }
151}