1use crate::log::instrument;
2use bytes::Bytes;
3use futures::{Stream, TryStreamExt};
4use gloo::net::http::{
5 Headers as GlooHttpHeaders, Method, Request as GlooHttpRequest,
6 RequestBuilder as GlooHttpRequestBuilder, Response as GlooHttpResponse,
7};
8use http::{
9 header::{
10 InvalidHeaderName as HttpInvalidHeaderName, InvalidHeaderValue as HttpInvalidHeaderValue,
11 ToStrError as HttpHeaderToStrError,
12 },
13 Error as HttpError, HeaderName, Request as HttpRequest, Response as HttpResponse,
14};
15use http_body::{Body as HttpBody, Frame as HttpBodyFrame};
16use http_body_util::BodyExt;
17use js_sys::Uint8Array;
18use nill::{nil, Nil};
19use std::{
20 future::Future,
21 pin::Pin,
22 task::{Context, Poll},
23};
24use tonic::{body::Body as GrpcBody, Status};
25use tonic_web::GrpcWebCall;
26use tower::Service;
27use wasm_bindgen::JsValue;
28use wasm_streams::ReadableStream as WasmReadableStream;
29use web_sys::{ReadableStream as HttpReadableStream, RequestMode};
30
31#[derive(Debug, thiserror::Error)]
32pub enum GrpcWebError {
33 #[error(transparent)]
34 HttpError(#[from] HttpError),
35
36 #[error(transparent)]
37 HttpHeaderToStr(#[from] HttpHeaderToStrError),
38
39 #[error(transparent)]
40 HttpInvalidHeaderName(#[from] HttpInvalidHeaderName),
41
42 #[error(transparent)]
43 HttpInvalidHeaderValue(#[from] HttpInvalidHeaderValue),
44
45 #[error(transparent)]
46 TonicStatus(#[from] Status),
47
48 #[error(transparent)]
49 GlooNet(#[from] gloo::net::Error),
50
51 #[error("{0}")]
52 Generic(String),
53}
54
55macro_rules! grpc_err {
56 ($($arg:tt)*) => {
57 Err(GrpcWebError::Generic(format!($($arg)*)))
58 }
59}
60
61impl From<JsValue> for GrpcWebError {
62 fn from(err: JsValue) -> Self {
63 GrpcWebError::Generic(format!("{err:?}"))
64 }
65}
66
67trait HttpRequestExt {
68 async fn try_into_fetch(self) -> Result<GlooHttpRequest, GrpcWebError>;
69}
70
71impl HttpRequestExt for HttpRequest<GrpcWebCall<GrpcBody>> {
72 async fn try_into_fetch(self) -> Result<GlooHttpRequest, GrpcWebError> {
73 let uri = self.uri().to_string();
74 let headers = GlooHttpHeaders::new();
75 for (key, val) in self.headers() {
76 headers.set(key.as_str(), val.to_str()?);
77 }
78 let bytes = self.into_body().collect().await?.to_bytes();
79 tracing::info!("{bytes:?}");
80 let fetch = GlooHttpRequestBuilder::new(&uri)
81 .mode(RequestMode::Cors)
82 .headers(headers)
83 .method(Method::POST)
84 .body(Uint8Array::from(&*bytes))?;
85 Ok(fetch)
86 }
87}
88
89trait HttpResponseExt {
90 async fn try_into_grpc(self) -> Result<HttpResponse<GrpcBody>, GrpcWebError>;
91}
92
93impl HttpResponseExt for GlooHttpResponse {
94 async fn try_into_grpc(self) -> Result<HttpResponse<GrpcBody>, GrpcWebError> {
95 if let Some(http_stream) = self.body() {
96 let body = GrpcBody::new(GrpcWebCallStream::new(http_stream));
97 let mut grpc = HttpResponse::builder().status(self.status()).body(body)?;
98 let headers = grpc.headers_mut();
99 for (key, val) in self.headers().entries() {
100 headers.insert(HeaderName::try_from(key)?, val.parse()?);
101 }
102 Ok(grpc)
103 } else {
104 grpc_err!("HTTP content return None: {self:?}")
105 }
106 }
107}
108
109pub struct GrpcWebCallStream {
110 inner: Pin<Box<dyn Stream<Item = Result<HttpBodyFrame<Bytes>, GrpcWebError>>>>,
111}
112
113impl GrpcWebCallStream {
114 pub fn new(http_stream: HttpReadableStream) -> Self {
115 let wasm_stream = WasmReadableStream::from_raw(http_stream)
116 .into_stream()
117 .map_ok(|data| HttpBodyFrame::data(Bytes::from(Uint8Array::new(&data).to_vec())))
118 .map_err(GrpcWebError::from);
119
120 Self {
121 inner: Box::pin(wasm_stream),
122 }
123 }
124}
125
126unsafe impl Send for GrpcWebCallStream {}
127
128impl HttpBody for GrpcWebCallStream {
129 type Data = Bytes;
130
131 type Error = GrpcWebError;
132
133 fn poll_frame(
134 mut self: Pin<&mut Self>,
135 cx: &mut Context<'_>,
136 ) -> Poll<Option<Result<HttpBodyFrame<Self::Data>, Self::Error>>> {
137 self.inner.as_mut().poll_next(cx)
139 }
140}
141
142#[derive(Debug, Default, Clone)]
143pub struct Client {}
144
145impl Client {
146 pub fn new() -> Self {
147 Self::default()
148 }
149
150 #[instrument(skip_all, err, fields(url = ?grpc.uri()))]
151 async fn grpc_web_call(
152 self,
153 grpc: HttpRequest<GrpcWebCall<GrpcBody>>,
154 ) -> Result<HttpResponse<GrpcBody>, GrpcWebError> {
155 let fetch = grpc.try_into_fetch().await?;
156 fetch.send().await?.try_into_grpc().await
157 }
158}
159
160impl Service<HttpRequest<GrpcWebCall<GrpcBody>>> for Client {
161 type Response = HttpResponse<GrpcBody>;
162
163 type Error = GrpcWebError;
164
165 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
166
167 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<Nil, Self::Error>> {
168 Poll::Ready(Ok(nil))
169 }
170
171 fn call(&mut self, grpc: HttpRequest<GrpcWebCall<GrpcBody>>) -> Self::Future {
172 Box::pin(self.clone().grpc_web_call(grpc))
174 }
175}