connectrpc_workers/
lib.rs1use connectrpc::client::{BoxFuture, ClientBody, ClientTransport};
45use http::uri::{Authority, PathAndQuery, Scheme};
46use http::{Request, Response, Uri};
47use worker::send::{SendFuture, SendWrapper};
48use worker::{Body, Fetch, Fetcher};
49
50#[derive(Clone)]
55pub struct FetcherTransport {
56 fetcher: SendWrapper<Fetcher>,
57}
58
59impl FetcherTransport {
60 pub fn new(fetcher: Fetcher) -> Self {
61 Self {
62 fetcher: SendWrapper::new(fetcher),
63 }
64 }
65}
66
67impl std::fmt::Debug for FetcherTransport {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("FetcherTransport").finish_non_exhaustive()
70 }
71}
72
73impl ClientTransport for FetcherTransport {
74 type ResponseBody = Body;
75 type Error = worker::Error;
76
77 fn send(
78 &self,
79 request: Request<ClientBody>,
80 ) -> BoxFuture<'static, Result<Response<Self::ResponseBody>, Self::Error>> {
81 let fetcher = (*self.fetcher).clone();
83 Box::pin(SendFuture::new(async move {
84 fetcher.fetch_request(request).await
85 }))
86 }
87}
88
89#[derive(Clone, Debug)]
99pub struct FetchTransport {
100 scheme: Scheme,
101 authority: Authority,
102}
103
104impl FetchTransport {
105 pub fn new(base: Uri) -> Result<Self, worker::Error> {
106 let parts = base.into_parts();
107 let scheme = parts.scheme.ok_or_else(|| {
108 worker::Error::RustError("FetchTransport base URI is missing a scheme".into())
109 })?;
110 let authority = parts.authority.ok_or_else(|| {
111 worker::Error::RustError("FetchTransport base URI is missing an authority".into())
112 })?;
113 Ok(Self { scheme, authority })
114 }
115}
116
117impl ClientTransport for FetchTransport {
118 type ResponseBody = Body;
119 type Error = worker::Error;
120
121 fn send(
122 &self,
123 request: Request<ClientBody>,
124 ) -> BoxFuture<'static, Result<Response<Self::ResponseBody>, Self::Error>> {
125 let scheme = self.scheme.clone();
126 let authority = self.authority.clone();
127 Box::pin(SendFuture::new(async move {
128 let request = rewrite_uri(request, scheme, authority)?;
129 let req = worker::Request::try_from(request)?;
130 Fetch::Request(req).send().await.and_then(|resp| {
131 let resp: http::Response<Body> = resp.try_into()?;
132 Ok(resp)
133 })
134 }))
135 }
136}
137
138fn rewrite_uri<B>(
139 mut req: http::Request<B>,
140 scheme: Scheme,
141 authority: Authority,
142) -> Result<http::Request<B>, worker::Error> {
143 let path_and_query = req
144 .uri()
145 .path_and_query()
146 .cloned()
147 .unwrap_or_else(|| PathAndQuery::from_static("/"));
148 let new_uri = Uri::builder()
149 .scheme(scheme)
150 .authority(authority)
151 .path_and_query(path_and_query)
152 .build()
153 .map_err(|e| worker::Error::RustError(format!("rewrite uri: {e}")))?;
154 *req.uri_mut() = new_uri;
155 Ok(req)
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn fetch_transport_rejects_uri_without_scheme() {
164 let uri: Uri = "/some/path".parse().unwrap();
168 let err = FetchTransport::new(uri).unwrap_err();
169 assert!(format!("{err}").contains("scheme"));
170 }
171
172 #[test]
173 fn fetch_transport_rejects_uri_without_authority() {
174 let uri: Uri = "mailto:user@example.com".parse().unwrap();
176 let err = FetchTransport::new(uri).unwrap_err();
177 let msg = format!("{err}");
178 assert!(
181 msg.contains("authority") || msg.contains("scheme"),
182 "unexpected error: {msg}"
183 );
184 }
185
186 #[test]
187 fn fetch_transport_accepts_https_uri() {
188 let uri: Uri = "https://example.com:8443/base".parse().unwrap();
189 let t = FetchTransport::new(uri).expect("valid base URI");
190 assert_eq!(t.scheme, Scheme::HTTPS);
191 assert_eq!(t.authority.as_str(), "example.com:8443");
192 }
193
194 #[test]
195 fn rewrite_uri_replaces_scheme_and_authority_keeps_path() {
196 let req: http::Request<()> = http::Request::builder()
197 .uri("http://placeholder.invalid/foo/bar?x=1")
198 .body(())
199 .unwrap();
200 let scheme = Scheme::HTTPS;
201 let authority: Authority = "real.example:8443".parse().unwrap();
202 let rewritten = rewrite_uri(req, scheme, authority).unwrap();
203 let uri = rewritten.uri();
204 assert_eq!(uri.scheme_str(), Some("https"));
205 assert_eq!(
206 uri.authority().map(|a| a.as_str()),
207 Some("real.example:8443")
208 );
209 assert_eq!(uri.path(), "/foo/bar");
210 assert_eq!(uri.query(), Some("x=1"));
211 }
212
213 #[test]
214 fn rewrite_uri_defaults_path_to_root_when_missing() {
215 let req: http::Request<()> = http::Request::builder()
217 .uri("http://placeholder")
218 .body(())
219 .unwrap();
220 let scheme = Scheme::HTTP;
221 let authority: Authority = "real.example".parse().unwrap();
222 let rewritten = rewrite_uri(req, scheme, authority).unwrap();
223 assert_eq!(rewritten.uri().path(), "/");
224 }
225
226 #[test]
231 fn transports_implement_client_transport() {
232 fn assert_transport<T: ClientTransport>() {}
233 assert_transport::<FetcherTransport>();
234 assert_transport::<FetchTransport>();
235 }
236}