Skip to main content

multistore_cf_workers/
backend.rs

1//! Backend client for the Cloudflare Workers runtime.
2//!
3//! Contains [`WorkerBackend`], which implements [`ProxyBackend`] by forwarding
4//! requests through the Workers Fetch API and reading responses as
5//! `web_sys::Response` streams.
6
7use crate::body::JsBody;
8use crate::fetch_connector::FetchConnector;
9use crate::headers::WsHeaders;
10use crate::response::headermap_from_js;
11use bytes::Bytes;
12use http::HeaderMap;
13use multistore::backend::ForwardResponse;
14use multistore::backend::{build_signer, create_builder, ProxyBackend, RawResponse, StoreBuilder};
15use multistore::error::ProxyError;
16use multistore::route_handler::ForwardRequest;
17use multistore::types::BucketConfig;
18
19use object_store::list::PaginatedListStore;
20use object_store::signer::Signer;
21use object_store::RetryConfig;
22use std::sync::Arc;
23use worker::Fetch;
24
25/// Backend for the Cloudflare Workers runtime.
26///
27/// Uses `FetchConnector` for `object_store` HTTP requests and `web_sys::fetch`
28/// for raw multipart operations.
29#[derive(Clone)]
30pub struct WorkerBackend;
31
32impl ProxyBackend for WorkerBackend {
33    type ResponseBody = web_sys::Response;
34    type Body = JsBody;
35
36    async fn forward(
37        &self,
38        request: ForwardRequest,
39        body: JsBody,
40    ) -> Result<ForwardResponse<Self::ResponseBody>, ProxyError> {
41        let js_body = body;
42
43        // Build web_sys::RequestInit.
44        let init = web_sys::RequestInit::new();
45        init.set_method(request.method.as_str());
46        init.set_headers(&WsHeaders::from(&request.headers).into_inner().into());
47
48        // Bypass Cloudflare's subrequest cache for reads where leaving it on
49        // would break the request (HEAD rewritten to GET on cacheable-extension
50        // URLs, or Range poisoning the full-object cache). See
51        // `ForwardRequest::should_bypass_cache`.
52        if request.should_bypass_cache() {
53            init.set_cache(web_sys::RequestCache::NoStore);
54        }
55
56        // For PUT: attach the original ReadableStream directly (zero-copy!).
57        if request.method == http::Method::PUT {
58            if let Some(stream) = js_body.stream() {
59                init.set_body(stream);
60            }
61        }
62
63        // Build the outgoing request.
64        let ws_request = web_sys::Request::new_with_str_and_init(request.url.as_str(), &init)
65            .map_err(|e| ProxyError::Internal(format!("failed to create request: {:?}", e)))?;
66
67        // Fetch via the worker crate's Fetch API.
68        let worker_req: worker::Request = ws_request.into();
69        let worker_resp = worker::Fetch::Request(worker_req)
70            .send()
71            .await
72            .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
73
74        // Convert to web_sys::Response to access the body stream.
75        let backend_ws: web_sys::Response = worker_resp.into();
76        let status = backend_ws.status();
77
78        let headers = headermap_from_js(&backend_ws.headers());
79        let content_length = headers
80            .get(http::header::CONTENT_LENGTH)
81            .and_then(|v| v.to_str().ok())
82            .and_then(|v| v.parse::<u64>().ok());
83
84        Ok(ForwardResponse {
85            status,
86            headers,
87            body: backend_ws,
88            content_length,
89        })
90    }
91
92    fn create_paginated_store(
93        &self,
94        config: &BucketConfig,
95    ) -> Result<Box<dyn PaginatedListStore>, ProxyError> {
96        // Disable retries: object_store's retry logic uses `tokio::time::sleep`
97        // which panics on WASM (`std::time::Instant::now` is unsupported).
98        // See: https://github.com/apache/arrow-rs-object-store/issues/624
99        let no_retry = RetryConfig {
100            max_retries: 0,
101            ..Default::default()
102        };
103        let builder = match create_builder(config)? {
104            StoreBuilder::S3(s) => {
105                StoreBuilder::S3(s.with_http_connector(FetchConnector).with_retry(no_retry))
106            }
107            #[cfg(feature = "azure")]
108            StoreBuilder::Azure(a) => {
109                StoreBuilder::Azure(a.with_http_connector(FetchConnector).with_retry(no_retry))
110            }
111            #[cfg(feature = "gcp")]
112            StoreBuilder::Gcs(g) => {
113                StoreBuilder::Gcs(g.with_http_connector(FetchConnector).with_retry(no_retry))
114            }
115        };
116        builder.build()
117    }
118
119    fn create_signer(&self, config: &BucketConfig) -> Result<Arc<dyn Signer>, ProxyError> {
120        build_signer(config)
121    }
122
123    async fn send_raw(
124        &self,
125        method: http::Method,
126        url: String,
127        headers: HeaderMap,
128        body: Bytes,
129    ) -> Result<RawResponse, ProxyError> {
130        tracing::debug!(
131            method = %method,
132            url = %url,
133            "worker: sending raw backend request via Fetch API"
134        );
135
136        // Build web_sys::RequestInit
137        let init = web_sys::RequestInit::new();
138        init.set_method(method.as_str());
139        init.set_headers(&WsHeaders::from(&headers).into_inner().into());
140
141        // Set body for methods that carry one
142        if !body.is_empty() {
143            let uint8 = js_sys::Uint8Array::from(body.as_ref());
144            init.set_body(&uint8.into());
145        }
146
147        let ws_request = web_sys::Request::new_with_str_and_init(&url, &init)
148            .map_err(|e| ProxyError::BackendError(format!("failed to create request: {:?}", e)))?;
149
150        // Fetch via worker
151        let worker_req: worker::Request = ws_request.into();
152        let worker_resp = Fetch::Request(worker_req)
153            .send()
154            .await
155            .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
156
157        let status = worker_resp.status_code();
158
159        // Convert to `web_sys::Response` and read the headers BEFORE consuming
160        // the body. The `worker::Response → web_sys::Response` conversion panics
161        // once the body has been read, so reading bytes first (and converting
162        // after) blew up `send_raw` on every multipart/batch-delete response.
163        // `forward()` relies on the same before-body ordering.
164        let ws_response: web_sys::Response = worker_resp.into();
165        let resp_headers = headermap_from_js(&ws_response.headers());
166
167        // Read the (small) response body via `arrayBuffer()`.
168        let buf = wasm_bindgen_futures::JsFuture::from(
169            ws_response
170                .array_buffer()
171                .map_err(|e| ProxyError::Internal(format!("arrayBuffer() failed: {:?}", e)))?,
172        )
173        .await
174        .map_err(|e| ProxyError::Internal(format!("failed to read response: {:?}", e)))?;
175        let resp_bytes = js_sys::Uint8Array::new(&buf).to_vec();
176
177        Ok(RawResponse {
178            status,
179            headers: resp_headers,
180            body: Bytes::from(resp_bytes),
181        })
182    }
183}