multistore_cf_workers/body.rs
1//! Zero-copy body wrapper for Cloudflare Workers.
2//!
3//! Holds the raw `ReadableStream` from an incoming request so it can be
4//! forwarded to the backend without copying through WASM memory.
5
6use bytes::Bytes;
7
8/// Zero-copy body wrapper. Holds the raw `ReadableStream` from the incoming
9/// request, passing it through the Gateway untouched for Forward requests.
10pub struct JsBody(Option<web_sys::ReadableStream>);
11
12impl JsBody {
13 /// Wrap an optional `ReadableStream` from a `web_sys::Request`.
14 pub fn new(stream: Option<web_sys::ReadableStream>) -> Self {
15 Self(stream)
16 }
17
18 /// Borrow the inner stream, if present.
19 pub fn stream(&self) -> Option<&web_sys::ReadableStream> {
20 self.0.as_ref()
21 }
22}
23
24// SAFETY: Workers is single-threaded; these are required by Gateway's generic bounds.
25unsafe impl Send for JsBody {}
26unsafe impl Sync for JsBody {}
27
28/// Materialize a `JsBody` into `Bytes` for the NeedsBody path.
29///
30/// Uses the `Response::arrayBuffer()` JS trick: wrap the stream in a
31/// `web_sys::Response`, call `.array_buffer()`, and convert via `Uint8Array`.
32/// This is only used for small multipart payloads.
33pub async fn collect_js_body(body: JsBody) -> std::result::Result<Bytes, String> {
34 match body.0 {
35 None => Ok(Bytes::new()),
36 Some(stream) => {
37 let resp = web_sys::Response::new_with_opt_readable_stream(Some(&stream))
38 .map_err(|e| format!("Response::new failed: {:?}", e))?;
39 let promise = resp
40 .array_buffer()
41 .map_err(|e| format!("arrayBuffer() failed: {:?}", e))?;
42 let buf = wasm_bindgen_futures::JsFuture::from(promise)
43 .await
44 .map_err(|e| format!("arrayBuffer await failed: {:?}", e))?;
45 let uint8 = js_sys::Uint8Array::new(&buf);
46 Ok(Bytes::from(uint8.to_vec()))
47 }
48 }
49}