Skip to main content

connectrpc_workers/
lib.rs

1//! ConnectRPC [`ClientTransport`] implementations backed by the Cloudflare
2//! Workers fetch APIs.
3//!
4//! Two transports are provided:
5//!
6//! * [`FetcherTransport`] wraps a [`worker::Fetcher`] (a `[[services]]`
7//!   binding). Use it for inter-service calls within the same Cloudflare
8//!   zone. The runtime short-circuits these requests, so there's no DNS
9//!   lookup, no TLS handshake, and no trip out to the public internet.
10//! * [`FetchTransport`] wraps the global [`worker::Fetch`] for arbitrary
11//!   `http://` / `https://` URLs.
12//!
13//! # Sendness
14//!
15//! `ClientTransport` requires `Send + Sync + 'static` on the type and a
16//! `Send + 'static` future. Workers' fetch is `!Send` (everything in
17//! JS-land is `!Send`). We use [`worker::send::SendFuture`] /
18//! [`worker::send::SendWrapper`] to satisfy the bound. workers-rs ships
19//! these specifically because the Workers isolate is single-threaded, so
20//! nothing is ever actually moved across threads.
21//!
22//! # Protocol
23//!
24//! Use [`connectrpc::Protocol::Connect`] (or `GrpcWeb`). Workers fetch
25//! subrequests don't expose raw HTTP/2, so gRPC's trailer requirement
26//! won't survive. Connect over HTTP/1.1 and GrpcWeb (trailers in body)
27//! both work.
28//!
29//! # Example
30//!
31//! ```ignore
32//! use connectrpc::client::ClientConfig;
33//! use connectrpc::Protocol;
34//! use connectrpc_workers::FetcherTransport;
35//!
36//! // Inside a #[event(fetch)] handler:
37//! let echo = env.service("ECHO")?;
38//! let transport = FetcherTransport::new(echo);
39//! let config = ClientConfig::new("http://echo/".parse()?).protocol(Protocol::Connect);
40//! let client = EchoServiceClient::new(transport, config);
41//! let resp = client.echo(EchoRequest { message: "hi".into() }).await?;
42//! ```
43
44use connectrpc::client::{BoxFuture, ClientBody, ClientTransport};
45use http::uri::{Authority, PathAndQuery, Scheme};
46use http::{Request, Response, Uri};
47use worker::send::{SendFuture, SendWrapper};
48use worker::{Body, Fetch, Fetcher};
49
50/// `ClientTransport` backed by a Workers service binding.
51///
52/// Construct with [`Self::new`]. Cloning is cheap because the underlying
53/// `Fetcher` is just a `JsValue` handle.
54#[derive(Clone)]
55pub struct FetcherTransport {
56    fetcher: SendWrapper<Fetcher>,
57}
58
59impl FetcherTransport {
60    pub fn new(fetcher: Fetcher) -> Self {
61        Self {
62            fetcher: SendWrapper::new(fetcher),
63        }
64    }
65}
66
67impl std::fmt::Debug for FetcherTransport {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("FetcherTransport").finish_non_exhaustive()
70    }
71}
72
73impl ClientTransport for FetcherTransport {
74    type ResponseBody = Body;
75    type Error = worker::Error;
76
77    fn send(
78        &self,
79        request: Request<ClientBody>,
80    ) -> BoxFuture<'static, Result<Response<Self::ResponseBody>, Self::Error>> {
81        // Clone the JsValue handle so the future owns it.
82        let fetcher = (*self.fetcher).clone();
83        Box::pin(SendFuture::new(async move {
84            fetcher.fetch_request(request).await
85        }))
86    }
87}
88
89/// `ClientTransport` backed by the global Workers `fetch` (arbitrary URL).
90///
91/// Useful for hitting an external ConnectRPC server. For same-zone calls
92/// prefer [`FetcherTransport`], since service bindings skip DNS and TLS
93/// and don't count against egress.
94///
95/// The transport rewrites the request URI's scheme and authority to point
96/// at `base`, so generated clients can keep using arbitrary `Uri`s in
97/// their `ClientConfig` without caring where the service actually lives.
98#[derive(Clone, Debug)]
99pub struct FetchTransport {
100    scheme: Scheme,
101    authority: Authority,
102}
103
104impl FetchTransport {
105    pub fn new(base: Uri) -> Result<Self, worker::Error> {
106        let parts = base.into_parts();
107        let scheme = parts.scheme.ok_or_else(|| {
108            worker::Error::RustError("FetchTransport base URI is missing a scheme".into())
109        })?;
110        let authority = parts.authority.ok_or_else(|| {
111            worker::Error::RustError("FetchTransport base URI is missing an authority".into())
112        })?;
113        Ok(Self { scheme, authority })
114    }
115}
116
117impl ClientTransport for FetchTransport {
118    type ResponseBody = Body;
119    type Error = worker::Error;
120
121    fn send(
122        &self,
123        request: Request<ClientBody>,
124    ) -> BoxFuture<'static, Result<Response<Self::ResponseBody>, Self::Error>> {
125        let scheme = self.scheme.clone();
126        let authority = self.authority.clone();
127        Box::pin(SendFuture::new(async move {
128            let request = rewrite_uri(request, scheme, authority)?;
129            let req = worker::Request::try_from(request)?;
130            Fetch::Request(req).send().await.and_then(|resp| {
131                let resp: http::Response<Body> = resp.try_into()?;
132                Ok(resp)
133            })
134        }))
135    }
136}
137
138fn rewrite_uri<B>(
139    mut req: http::Request<B>,
140    scheme: Scheme,
141    authority: Authority,
142) -> Result<http::Request<B>, worker::Error> {
143    let path_and_query = req
144        .uri()
145        .path_and_query()
146        .cloned()
147        .unwrap_or_else(|| PathAndQuery::from_static("/"));
148    let new_uri = Uri::builder()
149        .scheme(scheme)
150        .authority(authority)
151        .path_and_query(path_and_query)
152        .build()
153        .map_err(|e| worker::Error::RustError(format!("rewrite uri: {e}")))?;
154    *req.uri_mut() = new_uri;
155    Ok(req)
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn fetch_transport_rejects_uri_without_scheme() {
164        // Path-only URIs parse fine but don't have enough info to send a
165        // real request; we surface that at construction time, not at the
166        // first call.
167        let uri: Uri = "/some/path".parse().unwrap();
168        let err = FetchTransport::new(uri).unwrap_err();
169        assert!(format!("{err}").contains("scheme"));
170    }
171
172    #[test]
173    fn fetch_transport_rejects_uri_without_authority() {
174        // `mailto:` parses with a scheme but no authority.
175        let uri: Uri = "mailto:user@example.com".parse().unwrap();
176        let err = FetchTransport::new(uri).unwrap_err();
177        let msg = format!("{err}");
178        // Either authority or scheme could trigger first depending on
179        // parsing; both are "this URI can't address an HTTP server."
180        assert!(
181            msg.contains("authority") || msg.contains("scheme"),
182            "unexpected error: {msg}"
183        );
184    }
185
186    #[test]
187    fn fetch_transport_accepts_https_uri() {
188        let uri: Uri = "https://example.com:8443/base".parse().unwrap();
189        let t = FetchTransport::new(uri).expect("valid base URI");
190        assert_eq!(t.scheme, Scheme::HTTPS);
191        assert_eq!(t.authority.as_str(), "example.com:8443");
192    }
193
194    #[test]
195    fn rewrite_uri_replaces_scheme_and_authority_keeps_path() {
196        let req: http::Request<()> = http::Request::builder()
197            .uri("http://placeholder.invalid/foo/bar?x=1")
198            .body(())
199            .unwrap();
200        let scheme = Scheme::HTTPS;
201        let authority: Authority = "real.example:8443".parse().unwrap();
202        let rewritten = rewrite_uri(req, scheme, authority).unwrap();
203        let uri = rewritten.uri();
204        assert_eq!(uri.scheme_str(), Some("https"));
205        assert_eq!(
206            uri.authority().map(|a| a.as_str()),
207            Some("real.example:8443")
208        );
209        assert_eq!(uri.path(), "/foo/bar");
210        assert_eq!(uri.query(), Some("x=1"));
211    }
212
213    #[test]
214    fn rewrite_uri_defaults_path_to_root_when_missing() {
215        // `http::Uri` parsed from just `http://host` has no path-and-query.
216        let req: http::Request<()> = http::Request::builder()
217            .uri("http://placeholder")
218            .body(())
219            .unwrap();
220        let scheme = Scheme::HTTP;
221        let authority: Authority = "real.example".parse().unwrap();
222        let rewritten = rewrite_uri(req, scheme, authority).unwrap();
223        assert_eq!(rewritten.uri().path(), "/");
224    }
225
226    /// Compile-time check that both transports satisfy `ClientTransport`,
227    /// including its `Send + Sync + 'static` bound. If this stops compiling,
228    /// one of the `SendWrapper` / `SendFuture` shims is no longer pulling
229    /// its weight.
230    #[test]
231    fn transports_implement_client_transport() {
232        fn assert_transport<T: ClientTransport>() {}
233        assert_transport::<FetcherTransport>();
234        assert_transport::<FetchTransport>();
235    }
236}