multistore_cf_workers/
backend.rs1use crate::body::JsBody;
8use crate::fetch_connector::FetchConnector;
9use crate::headers::WsHeaders;
10use crate::response::headermap_from_js;
11use bytes::Bytes;
12use http::HeaderMap;
13use multistore::backend::ForwardResponse;
14use multistore::backend::{build_signer, create_builder, ProxyBackend, RawResponse, StoreBuilder};
15use multistore::error::ProxyError;
16use multistore::route_handler::ForwardRequest;
17use multistore::types::BucketConfig;
18
19use object_store::list::PaginatedListStore;
20use object_store::signer::Signer;
21use object_store::RetryConfig;
22use std::sync::Arc;
23use worker::Fetch;
24
25#[derive(Clone)]
30pub struct WorkerBackend;
31
32impl ProxyBackend for WorkerBackend {
33 type ResponseBody = web_sys::Response;
34 type Body = JsBody;
35
36 async fn forward(
37 &self,
38 request: ForwardRequest,
39 body: JsBody,
40 ) -> Result<ForwardResponse<Self::ResponseBody>, ProxyError> {
41 let js_body = body;
42
43 let init = web_sys::RequestInit::new();
45 init.set_method(request.method.as_str());
46 init.set_headers(&WsHeaders::from(&request.headers).into_inner().into());
47
48 if request.headers.contains_key(http::header::RANGE) {
50 init.set_cache(web_sys::RequestCache::NoStore);
51 }
52
53 if request.method == http::Method::PUT {
55 if let Some(stream) = js_body.stream() {
56 init.set_body(stream);
57 }
58 }
59
60 let ws_request = web_sys::Request::new_with_str_and_init(request.url.as_str(), &init)
62 .map_err(|e| ProxyError::Internal(format!("failed to create request: {:?}", e)))?;
63
64 let worker_req: worker::Request = ws_request.into();
66 let worker_resp = worker::Fetch::Request(worker_req)
67 .send()
68 .await
69 .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
70
71 let backend_ws: web_sys::Response = worker_resp.into();
73 let status = backend_ws.status();
74
75 let headers = headermap_from_js(&backend_ws.headers());
76 let content_length = headers
77 .get(http::header::CONTENT_LENGTH)
78 .and_then(|v| v.to_str().ok())
79 .and_then(|v| v.parse::<u64>().ok());
80
81 Ok(ForwardResponse {
82 status,
83 headers,
84 body: backend_ws,
85 content_length,
86 })
87 }
88
89 fn create_paginated_store(
90 &self,
91 config: &BucketConfig,
92 ) -> Result<Box<dyn PaginatedListStore>, ProxyError> {
93 let no_retry = RetryConfig {
97 max_retries: 0,
98 ..Default::default()
99 };
100 let builder = match create_builder(config)? {
101 StoreBuilder::S3(s) => {
102 StoreBuilder::S3(s.with_http_connector(FetchConnector).with_retry(no_retry))
103 }
104 #[cfg(feature = "azure")]
105 StoreBuilder::Azure(a) => {
106 StoreBuilder::Azure(a.with_http_connector(FetchConnector).with_retry(no_retry))
107 }
108 #[cfg(feature = "gcp")]
109 StoreBuilder::Gcs(g) => {
110 StoreBuilder::Gcs(g.with_http_connector(FetchConnector).with_retry(no_retry))
111 }
112 };
113 builder.build()
114 }
115
116 fn create_signer(&self, config: &BucketConfig) -> Result<Arc<dyn Signer>, ProxyError> {
117 build_signer(config)
118 }
119
120 async fn send_raw(
121 &self,
122 method: http::Method,
123 url: String,
124 headers: HeaderMap,
125 body: Bytes,
126 ) -> Result<RawResponse, ProxyError> {
127 tracing::debug!(
128 method = %method,
129 url = %url,
130 "worker: sending raw backend request via Fetch API"
131 );
132
133 let init = web_sys::RequestInit::new();
135 init.set_method(method.as_str());
136 init.set_headers(&WsHeaders::from(&headers).into_inner().into());
137
138 if !body.is_empty() {
140 let uint8 = js_sys::Uint8Array::from(body.as_ref());
141 init.set_body(&uint8.into());
142 }
143
144 let ws_request = web_sys::Request::new_with_str_and_init(&url, &init)
145 .map_err(|e| ProxyError::BackendError(format!("failed to create request: {:?}", e)))?;
146
147 let worker_req: worker::Request = ws_request.into();
149 let worker_resp = Fetch::Request(worker_req)
150 .send()
151 .await
152 .map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;
153
154 let status = worker_resp.status_code();
155
156 let ws_response: web_sys::Response = worker_resp.into();
162 let resp_headers = headermap_from_js(&ws_response.headers());
163
164 let buf = wasm_bindgen_futures::JsFuture::from(
166 ws_response
167 .array_buffer()
168 .map_err(|e| ProxyError::Internal(format!("arrayBuffer() failed: {:?}", e)))?,
169 )
170 .await
171 .map_err(|e| ProxyError::Internal(format!("failed to read response: {:?}", e)))?;
172 let resp_bytes = js_sys::Uint8Array::new(&buf).to_vec();
173
174 Ok(RawResponse {
175 status,
176 headers: resp_headers,
177 body: Bytes::from(resp_bytes),
178 })
179 }
180}