Skip to main content

dioxus_cloudflare/
handler.rs

1//! Request handler that bridges Cloudflare Workers to the Dioxus Axum router.
2//!
3//! Two entry points are available:
4//!
5//! - [`handle`] — simple one-shot dispatch (no middleware)
6//! - [`Handler`] — builder with before/after middleware hooks
7//!
8//! Both store the Worker `Env` and `Request` in thread-local context,
9//! convert `worker::Request` → `http::Request`, dispatch through the
10//! Dioxus Axum router, and stream the response back via `ReadableStream`.
11
12use std::convert::Infallible;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16use axum::body::Body;
17use http_body::Body as HttpBody;
18use tower::ServiceExt;
19use worker::{Request, Response, ResponseBuilder};
20
21use crate::context::{set_context, take_cookies};
22
23type BeforeHook = Box<dyn Fn(&Request) -> worker::Result<Option<Response>>>;
24type AfterHook = Box<dyn Fn(&mut Response) -> worker::Result<()>>;
25
26/// Configurable request handler with before/after middleware hooks.
27///
28/// # Example
29///
30/// ```rust,ignore
31/// use dioxus_cloudflare::Handler;
32/// use worker::*;
33///
34/// #[event(fetch)]
35/// async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
36///     unsafe { __wasm_call_ctors(); }
37///
38///     Handler::new()
39///         .before(|req| {
40///             // Short-circuit OPTIONS requests for CORS preflight
41///             if req.method() == worker::Method::Options {
42///                 let mut resp = Response::empty()?;
43///                 resp.headers_mut().set("Access-Control-Allow-Origin", "*")?;
44///                 resp.headers_mut().set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")?;
45///                 return Ok(Some(resp));
46///             }
47///             Ok(None) // continue to Axum dispatch
48///         })
49///         .after(|resp| {
50///             resp.headers_mut().set("Access-Control-Allow-Origin", "*")?;
51///             Ok(())
52///         })
53///         .handle(req, env)
54///         .await
55/// }
56/// ```
57pub struct Handler {
58    before: Vec<BeforeHook>,
59    after: Vec<AfterHook>,
60}
61
62impl Handler {
63    /// Create a new handler with no middleware hooks.
64    #[must_use]
65    pub fn new() -> Self {
66        Self {
67            before: Vec::new(),
68            after: Vec::new(),
69        }
70    }
71
72    /// Add a before-dispatch hook.
73    ///
74    /// Runs after context is set (so `cf::env()`, `cf::d1()`, etc. work),
75    /// before Axum dispatch. Return `Ok(None)` to continue,
76    /// `Ok(Some(resp))` to short-circuit with a custom response.
77    ///
78    /// Hooks run in the order they were added.
79    #[must_use]
80    pub fn before(
81        mut self,
82        hook: impl Fn(&Request) -> worker::Result<Option<Response>> + 'static,
83    ) -> Self {
84        self.before.push(Box::new(hook));
85        self
86    }
87
88    /// Add an after-dispatch hook.
89    ///
90    /// Runs after cookies are applied, before returning the response.
91    /// Use this to add headers, log, or modify the final response.
92    ///
93    /// After hooks also run on short-circuited responses from before hooks.
94    /// Hooks run in the order they were added.
95    #[must_use]
96    pub fn after(
97        mut self,
98        hook: impl Fn(&mut Response) -> worker::Result<()> + 'static,
99    ) -> Self {
100        self.after.push(Box::new(hook));
101        self
102    }
103
104    /// Dispatch the request through registered `#[server]` functions.
105    ///
106    /// This is the async entry point — call it from `#[event(fetch)]`.
107    #[allow(clippy::missing_errors_doc)]
108    pub async fn handle(&self, req: Request, env: worker::Env) -> worker::Result<Response> {
109        // Store env + request in thread-local for cf::env() / cf::req()
110        let req_clone = req
111            .clone()
112            .map_err(|e| worker::Error::RustError(format!("request clone failed: {e}")))?;
113        set_context(env, req_clone);
114
115        // Run before hooks — short-circuit if one returns a response
116        for hook in &self.before {
117            if let Some(resp) = hook(&req)? {
118                return self.finalize(resp);
119            }
120        }
121
122        // Convert worker::Request → http::Request
123        let http_req = worker_req_to_http(req).await?;
124
125        // Dispatch through the Dioxus Axum router
126        let http_resp = dispatch(http_req).await?;
127
128        // Convert http::Response → worker::Response (streaming)
129        let worker_resp = http_to_worker_resp(http_resp)?;
130
131        self.finalize(worker_resp)
132    }
133
134    /// Apply queued cookies and run after hooks on the final response.
135    fn finalize(&self, mut resp: Response) -> worker::Result<Response> {
136        // Apply any cookies queued by cf::set_cookie() / cf::clear_cookie()
137        for cookie in take_cookies() {
138            resp.headers_mut()
139                .append("Set-Cookie", &cookie)
140                .map_err(|e| worker::Error::RustError(format!("cookie append failed: {e}")))?;
141        }
142
143        // Run after hooks
144        for hook in &self.after {
145            hook(&mut resp)?;
146        }
147
148        // Context is NOT cleared here — with streaming, the response body may
149        // still be generating after handle() returns. The next request's
150        // set_context() overwrites the old values, which is safe because Workers
151        // handle one request at a time per isolate.
152
153        Ok(resp)
154    }
155}
156
157impl Default for Handler {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163/// Handle an incoming Cloudflare Worker request by dispatching it through
164/// the Dioxus server function router.
165///
166/// This is a convenience wrapper around [`Handler::new().handle()`](Handler::handle).
167/// Use [`Handler`] directly if you need before/after middleware hooks.
168///
169/// # Example
170///
171/// ```rust,ignore
172/// use worker::*;
173///
174/// extern "C" { fn __wasm_call_ctors(); }
175///
176/// #[event(fetch)]
177/// async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
178///     // Required: initialize inventory for server function registration.
179///     // SAFETY: Called once per Worker cold start. The `inventory` crate
180///     // requires this in WASM to register #[server] functions.
181///     unsafe { __wasm_call_ctors(); }
182///
183///     dioxus_cloudflare::handle(req, env).await
184/// }
185/// ```
186#[allow(clippy::missing_errors_doc)]
187pub async fn handle(req: Request, env: worker::Env) -> worker::Result<Response> {
188    Handler::new().handle(req, env).await
189}
190
191/// Convert a `worker::Request` into an `http::Request<Body>` that Axum
192/// can process.
193async fn worker_req_to_http(mut req: Request) -> worker::Result<http::Request<Body>> {
194    let method = match req.method() {
195        worker::Method::Get => http::Method::GET,
196        worker::Method::Post => http::Method::POST,
197        worker::Method::Put => http::Method::PUT,
198        worker::Method::Delete => http::Method::DELETE,
199        worker::Method::Options => http::Method::OPTIONS,
200        worker::Method::Head => http::Method::HEAD,
201        worker::Method::Patch => http::Method::PATCH,
202        _ => http::Method::GET,
203    };
204
205    let url = req.url()?;
206    let uri: http::Uri = url
207        .as_str()
208        .parse()
209        .map_err(|e| worker::Error::RustError(format!("invalid URI: {e}")))?;
210
211    let mut builder = http::Request::builder().method(method).uri(uri);
212
213    // Copy headers from worker request to http request
214    for (key, value) in req.headers() {
215        builder = builder.header(&key, &value);
216    }
217
218    // Read body as bytes and wrap in axum Body
219    let body_bytes = req.bytes().await?;
220
221    builder
222        .body(Body::from(body_bytes))
223        .map_err(|e| worker::Error::RustError(format!("request build failed: {e}")))
224}
225
226/// Build an Axum router containing all registered `#[server]` functions,
227/// then dispatch the request through it.
228///
229/// Returns the response with the body passed through (not collected into
230/// bytes), allowing streaming responses to flow through to the Worker.
231async fn dispatch(req: http::Request<Body>) -> worker::Result<http::Response<Body>> {
232    // Collect all #[server] functions registered by inventory and add
233    // them as routes to an Axum router.
234    //
235    // ServerFunction::collect() returns all functions that were registered
236    // when __wasm_call_ctors() ran. Each function knows its own path
237    // (e.g., "/api/ping") and HTTP method.
238    let mut router: axum::Router<dioxus_server::FullstackState> = axum::Router::new();
239    for func in dioxus_server::ServerFunction::collect() {
240        router = router.route(func.path(), func.method_router());
241    }
242
243    // Convert Router<FullstackState> → Router<()> using a headless state
244    // (no SSR renderer needed — we only serve API endpoints).
245    let router = router.with_state(dioxus_server::FullstackState::headless());
246
247    // Dispatch the request through the router. Router's Service impl has
248    // error type Infallible — it always produces a response (possibly 404).
249    Ok(router
250        .oneshot(req)
251        .await
252        .unwrap_or_else(|e: Infallible| match e {}))
253}
254
255/// Adapter that converts an `axum::body::Body` into a
256/// `Stream<Item = Result<Vec<u8>, worker::Error>>`.
257///
258/// This satisfies `TryStream<Ok = Vec<u8>, Error = worker::Error>` via the
259/// blanket impl, matching `ResponseBuilder::from_stream()`'s bounds.
260///
261/// `axum::body::Body` is `Unpin` (wraps `Pin<Box<...>>`), so no pin
262/// projection is needed.
263struct AxumBodyStream(Body);
264
265impl futures_core::Stream for AxumBodyStream {
266    type Item = Result<Vec<u8>, worker::Error>;
267
268    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269        loop {
270            match Pin::new(&mut self.0).poll_frame(cx) {
271                Poll::Ready(Some(Ok(frame))) => {
272                    if let Ok(data) = frame.into_data() {
273                        return Poll::Ready(Some(Ok(data.to_vec())));
274                    }
275                    // Trailers frame — skip, poll for more data
276                }
277                Poll::Ready(Some(Err(e))) => {
278                    return Poll::Ready(Some(Err(worker::Error::RustError(format!(
279                        "body frame error: {e}"
280                    )))));
281                }
282                Poll::Ready(None) => return Poll::Ready(None),
283                Poll::Pending => return Poll::Pending,
284            }
285        }
286    }
287}
288
289/// Convert an `http::Response<Body>` into a `worker::Response` backed by
290/// a `ReadableStream`, enabling streaming response bodies.
291fn http_to_worker_resp(resp: http::Response<Body>) -> worker::Result<Response> {
292    let (parts, body) = resp.into_parts();
293
294    let mut worker_resp = ResponseBuilder::new()
295        .with_status(parts.status.as_u16())
296        .from_stream(AxumBodyStream(body))?;
297
298    // Copy response headers
299    for (key, value) in &parts.headers {
300        if let Ok(v) = value.to_str() {
301            worker_resp
302                .headers_mut()
303                .set(key.as_str(), v)
304                .map_err(|e| worker::Error::RustError(format!("header set failed: {e}")))?;
305        }
306    }
307
308    Ok(worker_resp)
309}