Skip to main content

multistore_cf_workers/
backend.rs

1//! Backend client for the Cloudflare Workers runtime.
2//!
3//! Contains [`WorkerBackend`], which implements [`ProxyBackend`] by forwarding
4//! requests through the Workers Fetch API and reading responses as
5//! `web_sys::Response` streams.
6
7use crate::body::JsBody;
8use crate::fetch_connector::FetchConnector;
9use crate::headers::WsHeaders;
10use crate::response::headermap_from_js;
11use bytes::Bytes;
12use http::HeaderMap;
13use multistore::backend::ForwardResponse;
14use multistore::backend::{build_signer, create_builder, ProxyBackend, RawResponse, StoreBuilder};
15use multistore::error::ProxyError;
16use multistore::route_handler::ForwardRequest;
17use multistore::types::BucketConfig;
18
19use object_store::list::PaginatedListStore;
20use object_store::signer::Signer;
21use object_store::RetryConfig;
22use std::sync::Arc;
23use worker::Fetch;
24
25/// Backend for the Cloudflare Workers runtime.
26///
27/// Uses `FetchConnector` for `object_store` HTTP requests and `web_sys::fetch`
28/// for raw multipart operations.
29#[derive(Clone)]
30pub struct WorkerBackend;
31
32impl ProxyBackend for WorkerBackend {
33    type ResponseBody = web_sys::Response;
34    type Body = JsBody;
35
36    async fn forward(
37        &self,
38        request: ForwardRequest,
39        body: JsBody,
40    ) -> Result<ForwardResponse<Self::ResponseBody>, ProxyError> {
41        let js_body = body;
42
43        // Build web_sys::RequestInit.
44        let init = web_sys::RequestInit::new();
45        init.set_method(request.method.as_str());
46        init.set_headers(&WsHeaders::from(&request.headers).into_inner().into());
47
48        // Bypass Cloudflare's subrequest cache for Range requests.
49        if request.headers.contains_key(http::header::RANGE) {
50            init.set_cache(web_sys::RequestCache::NoStore);
51        }
52
53        // For PUT: attach the original ReadableStream directly (zero-copy!).
54        if request.method == http::Method::PUT {
55            if let Some(stream) = js_body.stream() {
56                init.set_body(stream);
57            }
58        }
59
60        // Build the outgoing request.
61        let ws_request = web_sys::Request::new_with_str_and_init(request.url.as_str(), &init)
62            .map_err(|e| ProxyError::Internal(format!("failed to create request: {:?}", e)))?;
63
64        // Fetch via the worker crate's Fetch API.
65        let worker_req: worker::Request = ws_request.into();
66        let worker_resp = worker::Fetch::Request(worker_req)
67            .send()
68            .await
69            .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
70
71        // Convert to web_sys::Response to access the body stream.
72        let backend_ws: web_sys::Response = worker_resp.into();
73        let status = backend_ws.status();
74
75        let headers = headermap_from_js(&backend_ws.headers());
76        let content_length = headers
77            .get(http::header::CONTENT_LENGTH)
78            .and_then(|v| v.to_str().ok())
79            .and_then(|v| v.parse::<u64>().ok());
80
81        Ok(ForwardResponse {
82            status,
83            headers,
84            body: backend_ws,
85            content_length,
86        })
87    }
88
89    fn create_paginated_store(
90        &self,
91        config: &BucketConfig,
92    ) -> Result<Box<dyn PaginatedListStore>, ProxyError> {
93        // Disable retries: object_store's retry logic uses `tokio::time::sleep`
94        // which panics on WASM (`std::time::Instant::now` is unsupported).
95        // See: https://github.com/apache/arrow-rs-object-store/issues/624
96        let no_retry = RetryConfig {
97            max_retries: 0,
98            ..Default::default()
99        };
100        let builder = match create_builder(config)? {
101            StoreBuilder::S3(s) => {
102                StoreBuilder::S3(s.with_http_connector(FetchConnector).with_retry(no_retry))
103            }
104            #[cfg(feature = "azure")]
105            StoreBuilder::Azure(a) => {
106                StoreBuilder::Azure(a.with_http_connector(FetchConnector).with_retry(no_retry))
107            }
108            #[cfg(feature = "gcp")]
109            StoreBuilder::Gcs(g) => {
110                StoreBuilder::Gcs(g.with_http_connector(FetchConnector).with_retry(no_retry))
111            }
112        };
113        builder.build()
114    }
115
116    fn create_signer(&self, config: &BucketConfig) -> Result<Arc<dyn Signer>, ProxyError> {
117        build_signer(config)
118    }
119
120    async fn send_raw(
121        &self,
122        method: http::Method,
123        url: String,
124        headers: HeaderMap,
125        body: Bytes,
126    ) -> Result<RawResponse, ProxyError> {
127        tracing::debug!(
128            method = %method,
129            url = %url,
130            "worker: sending raw backend request via Fetch API"
131        );
132
133        // Build web_sys::RequestInit
134        let init = web_sys::RequestInit::new();
135        init.set_method(method.as_str());
136        init.set_headers(&WsHeaders::from(&headers).into_inner().into());
137
138        // Set body for methods that carry one
139        if !body.is_empty() {
140            let uint8 = js_sys::Uint8Array::from(body.as_ref());
141            init.set_body(&uint8.into());
142        }
143
144        let ws_request = web_sys::Request::new_with_str_and_init(&url, &init)
145            .map_err(|e| ProxyError::BackendError(format!("failed to create request: {:?}", e)))?;
146
147        // Fetch via worker
148        let worker_req: worker::Request = ws_request.into();
149        let mut worker_resp = Fetch::Request(worker_req)
150            .send()
151            .await
152            .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
153
154        let status = worker_resp.status_code();
155
156        // Read response body as bytes (multipart responses are small)
157        let resp_bytes = worker_resp
158            .bytes()
159            .await
160            .map_err(|e| ProxyError::Internal(format!("failed to read response: {}", e)))?;
161
162        let ws_response: web_sys::Response = worker_resp.into();
163        let resp_headers = headermap_from_js(&ws_response.headers());
164
165        Ok(RawResponse {
166            status,
167            headers: resp_headers,
168            body: Bytes::from(resp_bytes),
169        })
170    }
171}