multistore_cf_workers/
backend.rs1use crate::body::JsBody;
8use crate::fetch_connector::FetchConnector;
9use crate::headers::WsHeaders;
10use crate::response::headermap_from_js;
11use bytes::Bytes;
12use http::HeaderMap;
13use multistore::backend::ForwardResponse;
14use multistore::backend::{build_signer, create_builder, ProxyBackend, RawResponse, StoreBuilder};
15use multistore::error::ProxyError;
16use multistore::route_handler::ForwardRequest;
17use multistore::types::BucketConfig;
18
19use object_store::list::PaginatedListStore;
20use object_store::signer::Signer;
21use object_store::RetryConfig;
22use std::sync::Arc;
23use worker::Fetch;
24
25#[derive(Clone)]
30pub struct WorkerBackend;
31
32impl ProxyBackend for WorkerBackend {
33 type ResponseBody = web_sys::Response;
34 type Body = JsBody;
35
36 async fn forward(
37 &self,
38 request: ForwardRequest,
39 body: JsBody,
40 ) -> Result<ForwardResponse<Self::ResponseBody>, ProxyError> {
41 let js_body = body;
42
43 let init = web_sys::RequestInit::new();
45 init.set_method(request.method.as_str());
46 init.set_headers(&WsHeaders::from(&request.headers).into_inner().into());
47
48 if request.should_bypass_cache() {
53 init.set_cache(web_sys::RequestCache::NoStore);
54 }
55
56 if request.method == http::Method::PUT {
58 if let Some(stream) = js_body.stream() {
59 init.set_body(stream);
60 }
61 }
62
63 let ws_request = web_sys::Request::new_with_str_and_init(request.url.as_str(), &init)
65 .map_err(|e| ProxyError::Internal(format!("failed to create request: {:?}", e)))?;
66
67 let worker_req: worker::Request = ws_request.into();
69 let worker_resp = worker::Fetch::Request(worker_req)
70 .send()
71 .await
72 .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
73
74 let backend_ws: web_sys::Response = worker_resp.into();
76 let status = backend_ws.status();
77
78 let headers = headermap_from_js(&backend_ws.headers());
79 let content_length = headers
80 .get(http::header::CONTENT_LENGTH)
81 .and_then(|v| v.to_str().ok())
82 .and_then(|v| v.parse::<u64>().ok());
83
84 Ok(ForwardResponse {
85 status,
86 headers,
87 body: backend_ws,
88 content_length,
89 })
90 }
91
92 fn create_paginated_store(
93 &self,
94 config: &BucketConfig,
95 ) -> Result<Box<dyn PaginatedListStore>, ProxyError> {
96 let no_retry = RetryConfig {
100 max_retries: 0,
101 ..Default::default()
102 };
103 let builder = match create_builder(config)? {
104 StoreBuilder::S3(s) => {
105 StoreBuilder::S3(s.with_http_connector(FetchConnector).with_retry(no_retry))
106 }
107 #[cfg(feature = "azure")]
108 StoreBuilder::Azure(a) => {
109 StoreBuilder::Azure(a.with_http_connector(FetchConnector).with_retry(no_retry))
110 }
111 #[cfg(feature = "gcp")]
112 StoreBuilder::Gcs(g) => {
113 StoreBuilder::Gcs(g.with_http_connector(FetchConnector).with_retry(no_retry))
114 }
115 };
116 builder.build()
117 }
118
119 fn create_signer(&self, config: &BucketConfig) -> Result<Arc<dyn Signer>, ProxyError> {
120 build_signer(config)
121 }
122
123 async fn send_raw(
124 &self,
125 method: http::Method,
126 url: String,
127 headers: HeaderMap,
128 body: Bytes,
129 ) -> Result<RawResponse, ProxyError> {
130 tracing::debug!(
131 method = %method,
132 url = %url,
133 "worker: sending raw backend request via Fetch API"
134 );
135
136 let init = web_sys::RequestInit::new();
138 init.set_method(method.as_str());
139 init.set_headers(&WsHeaders::from(&headers).into_inner().into());
140
141 if !body.is_empty() {
143 let uint8 = js_sys::Uint8Array::from(body.as_ref());
144 init.set_body(&uint8.into());
145 }
146
147 let ws_request = web_sys::Request::new_with_str_and_init(&url, &init)
148 .map_err(|e| ProxyError::BackendError(format!("failed to create request: {:?}", e)))?;
149
150 let worker_req: worker::Request = ws_request.into();
152 let worker_resp = Fetch::Request(worker_req)
153 .send()
154 .await
155 .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
156
157 let status = worker_resp.status_code();
158
159 let ws_response: web_sys::Response = worker_resp.into();
165 let resp_headers = headermap_from_js(&ws_response.headers());
166
167 let buf = wasm_bindgen_futures::JsFuture::from(
169 ws_response
170 .array_buffer()
171 .map_err(|e| ProxyError::Internal(format!("arrayBuffer() failed: {:?}", e)))?,
172 )
173 .await
174 .map_err(|e| ProxyError::Internal(format!("failed to read response: {:?}", e)))?;
175 let resp_bytes = js_sys::Uint8Array::new(&buf).to_vec();
176
177 Ok(RawResponse {
178 status,
179 headers: resp_headers,
180 body: Bytes::from(resp_bytes),
181 })
182 }
183}