t_web/
grpc_web_client.rs

1use crate::log::instrument;
2use bytes::Bytes;
3use futures::{Stream, TryStreamExt};
4use gloo::net::http::{
5    Headers as GlooHttpHeaders, Method, Request as GlooHttpRequest,
6    RequestBuilder as GlooHttpRequestBuilder, Response as GlooHttpResponse,
7};
8use http::{
9    header::{
10        InvalidHeaderName as HttpInvalidHeaderName, InvalidHeaderValue as HttpInvalidHeaderValue,
11        ToStrError as HttpHeaderToStrError,
12    },
13    Error as HttpError, HeaderName, Request as HttpRequest, Response as HttpResponse,
14};
15use http_body::{Body as HttpBody, Frame as HttpBodyFrame};
16use http_body_util::BodyExt;
17use js_sys::Uint8Array;
18use nill::{nil, Nil};
19use std::{
20    future::Future,
21    pin::Pin,
22    task::{Context, Poll},
23};
24use tonic::{body::Body as GrpcBody, Status};
25use tonic_web::GrpcWebCall;
26use tower::Service;
27use wasm_bindgen::JsValue;
28use wasm_streams::ReadableStream as WasmReadableStream;
29use web_sys::{ReadableStream as HttpReadableStream, RequestMode};
30
31#[derive(Debug, thiserror::Error)]
32pub enum GrpcWebError {
33    #[error(transparent)]
34    HttpError(#[from] HttpError),
35
36    #[error(transparent)]
37    HttpHeaderToStr(#[from] HttpHeaderToStrError),
38
39    #[error(transparent)]
40    HttpInvalidHeaderName(#[from] HttpInvalidHeaderName),
41
42    #[error(transparent)]
43    HttpInvalidHeaderValue(#[from] HttpInvalidHeaderValue),
44
45    #[error(transparent)]
46    TonicStatus(#[from] Status),
47
48    #[error(transparent)]
49    GlooNet(#[from] gloo::net::Error),
50
51    #[error("{0}")]
52    Generic(String),
53}
54
55macro_rules! grpc_err {
56    ($($arg:tt)*) => {
57        Err(GrpcWebError::Generic(format!($($arg)*)))
58    }
59}
60
61impl From<JsValue> for GrpcWebError {
62    fn from(err: JsValue) -> Self {
63        GrpcWebError::Generic(format!("{err:?}"))
64    }
65}
66
67trait HttpRequestExt {
68    async fn try_into_fetch(self) -> Result<GlooHttpRequest, GrpcWebError>;
69}
70
71impl HttpRequestExt for HttpRequest<GrpcWebCall<GrpcBody>> {
72    async fn try_into_fetch(self) -> Result<GlooHttpRequest, GrpcWebError> {
73        let uri = self.uri().to_string();
74        let headers = GlooHttpHeaders::new();
75        for (key, val) in self.headers() {
76            headers.set(key.as_str(), val.to_str()?);
77        }
78        let bytes = self.into_body().collect().await?.to_bytes();
79        tracing::info!("{bytes:?}");
80        let fetch = GlooHttpRequestBuilder::new(&uri)
81            .mode(RequestMode::Cors)
82            .headers(headers)
83            .method(Method::POST)
84            .body(Uint8Array::from(&*bytes))?;
85        Ok(fetch)
86    }
87}
88
89trait HttpResponseExt {
90    async fn try_into_grpc(self) -> Result<HttpResponse<GrpcBody>, GrpcWebError>;
91}
92
93impl HttpResponseExt for GlooHttpResponse {
94    async fn try_into_grpc(self) -> Result<HttpResponse<GrpcBody>, GrpcWebError> {
95        if let Some(http_stream) = self.body() {
96            let body = GrpcBody::new(GrpcWebCallStream::new(http_stream));
97            let mut grpc = HttpResponse::builder().status(self.status()).body(body)?;
98            let headers = grpc.headers_mut();
99            for (key, val) in self.headers().entries() {
100                headers.insert(HeaderName::try_from(key)?, val.parse()?);
101            }
102            Ok(grpc)
103        } else {
104            grpc_err!("HTTP content return None: {self:?}")
105        }
106    }
107}
108
109pub struct GrpcWebCallStream {
110    inner: Pin<Box<dyn Stream<Item = Result<HttpBodyFrame<Bytes>, GrpcWebError>>>>,
111}
112
113impl GrpcWebCallStream {
114    pub fn new(http_stream: HttpReadableStream) -> Self {
115        let wasm_stream = WasmReadableStream::from_raw(http_stream)
116            .into_stream()
117            .map_ok(|data| HttpBodyFrame::data(Bytes::from(Uint8Array::new(&data).to_vec())))
118            .map_err(GrpcWebError::from);
119
120        Self {
121            inner: Box::pin(wasm_stream),
122        }
123    }
124}
125
126unsafe impl Send for GrpcWebCallStream {}
127
128impl HttpBody for GrpcWebCallStream {
129    type Data = Bytes;
130
131    type Error = GrpcWebError;
132
133    fn poll_frame(
134        mut self: Pin<&mut Self>,
135        cx: &mut Context<'_>,
136    ) -> Poll<Option<Result<HttpBodyFrame<Self::Data>, Self::Error>>> {
137        // TODO: void dyn
138        self.inner.as_mut().poll_next(cx)
139    }
140}
141
142#[derive(Debug, Default, Clone)]
143pub struct Client {}
144
145impl Client {
146    pub fn new() -> Self {
147        Self::default()
148    }
149
150    #[instrument(skip_all, err, fields(url = ?grpc.uri()))]
151    async fn grpc_web_call(
152        self,
153        grpc: HttpRequest<GrpcWebCall<GrpcBody>>,
154    ) -> Result<HttpResponse<GrpcBody>, GrpcWebError> {
155        let fetch = grpc.try_into_fetch().await?;
156        fetch.send().await?.try_into_grpc().await
157    }
158}
159
160impl Service<HttpRequest<GrpcWebCall<GrpcBody>>> for Client {
161    type Response = HttpResponse<GrpcBody>;
162
163    type Error = GrpcWebError;
164
165    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
166
167    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<Nil, Self::Error>> {
168        Poll::Ready(Ok(nil))
169    }
170
171    fn call(&mut self, grpc: HttpRequest<GrpcWebCall<GrpcBody>>) -> Self::Future {
172        // TODO: void clone
173        Box::pin(self.clone().grpc_web_call(grpc))
174    }
175}