dioxus_fullstack_core/
streaming.rs

1use crate::{HttpError, ServerFnError};
2use axum_core::extract::FromRequest;
3use axum_core::response::IntoResponse;
4use dioxus_core::{CapturedError, ReactiveContext};
5use http::StatusCode;
6use http::{request::Parts, HeaderMap};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12/// The context provided by dioxus fullstack for server-side rendering.
13///
14/// This context will only be set on the server during the initial streaming response
15/// and inside server functions.
16#[derive(Clone, Debug)]
17pub struct FullstackContext {
18    // We expose the lock for request headers directly so it needs to be in a separate lock
19    request_headers: Arc<RwLock<http::request::Parts>>,
20
21    // The rest of the fields are only held internally, so we can group them together
22    lock: Arc<RwLock<FullstackContextInner>>,
23}
24
25// `FullstackContext` is always set when either
26// 1. rendering the app via SSR
27// 2. handling a server function request
28tokio::task_local! {
29    static FULLSTACK_CONTEXT: FullstackContext;
30}
31
32pub struct FullstackContextInner {
33    current_status: StreamingStatus,
34    current_status_subscribers: HashSet<ReactiveContext>,
35    response_headers: Option<HeaderMap>,
36    route_http_status: HttpError,
37    route_http_status_subscribers: HashSet<ReactiveContext>,
38}
39
40impl Debug for FullstackContextInner {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("FullstackContextInner")
43            .field("current_status", &self.current_status)
44            .field("response_headers", &self.response_headers)
45            .field("route_http_status", &self.route_http_status)
46            .finish()
47    }
48}
49
50impl PartialEq for FullstackContext {
51    fn eq(&self, other: &Self) -> bool {
52        Arc::ptr_eq(&self.lock, &other.lock)
53            && Arc::ptr_eq(&self.request_headers, &other.request_headers)
54    }
55}
56
57impl FullstackContext {
58    /// Create a new streaming context. You should not need to call this directly. Dioxus fullstack will
59    /// provide this context for you.
60    pub fn new(parts: Parts) -> Self {
61        Self {
62            request_headers: RwLock::new(parts).into(),
63            lock: RwLock::new(FullstackContextInner {
64                current_status: StreamingStatus::RenderingInitialChunk,
65                current_status_subscribers: Default::default(),
66                route_http_status: HttpError {
67                    status: http::StatusCode::OK,
68                    message: None,
69                },
70                route_http_status_subscribers: Default::default(),
71                response_headers: Some(HeaderMap::new()),
72            })
73            .into(),
74        }
75    }
76
77    /// Commit the initial chunk of the response. This will be called automatically if you are using the
78    /// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
79    /// to call this manually to start the streaming part of the response.
80    ///
81    /// Once this method has been called, the http response parts can no longer be modified.
82    pub fn commit_initial_chunk(&mut self) {
83        let mut lock = self.lock.write();
84        lock.current_status = StreamingStatus::InitialChunkCommitted;
85
86        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
87        #[allow(clippy::mutable_key_type)]
88        let subscribers = std::mem::take(&mut lock.current_status_subscribers);
89        for subscriber in subscribers {
90            subscriber.mark_dirty();
91        }
92    }
93
94    /// Get the current status of the streaming response. This method is reactive and will cause
95    /// the current reactive context to rerun when the status changes.
96    pub fn streaming_state(&self) -> StreamingStatus {
97        let mut lock = self.lock.write();
98        // Register the current reactive context as a subscriber to changes in the streaming status
99        if let Some(ctx) = ReactiveContext::current() {
100            lock.current_status_subscribers.insert(ctx);
101        }
102        lock.current_status
103    }
104
105    /// Access the http request parts mutably. This will allow you to modify headers and other parts of the request.
106    pub fn parts_mut(&self) -> parking_lot::RwLockWriteGuard<'_, http::request::Parts> {
107        self.request_headers.write()
108    }
109
110    /// Run a future within the scope of this FullstackContext.
111    pub async fn scope<F, R>(self, fut: F) -> R
112    where
113        F: std::future::Future<Output = R>,
114    {
115        FULLSTACK_CONTEXT.scope(self, fut).await
116    }
117
118    /// Extract an extension from the current request.
119    pub fn extension<T: Clone + Send + Sync + 'static>(&self) -> Option<T> {
120        let lock = self.request_headers.read();
121        lock.extensions.get::<T>().cloned()
122    }
123
124    /// Extract an axum extractor from the current request.
125    ///
126    /// The body of the request is always empty when using this method, as the body can only be consumed once in the server
127    /// function extractors.
128    pub async fn extract<T: FromRequest<Self, M>, M>() -> Result<T, ServerFnError> {
129        let this = Self::current().unwrap_or_else(|| {
130            // Create a dummy context if one doesn't exist, making the function usable outside of a request context
131            FullstackContext::new(
132                axum_core::extract::Request::builder()
133                    .method("GET")
134                    .uri("/")
135                    .header("X-Dummy-Header", "true")
136                    .body(())
137                    .unwrap()
138                    .into_parts()
139                    .0,
140            )
141        });
142
143        let parts = this.request_headers.read().clone();
144        let request = axum_core::extract::Request::from_parts(parts, Default::default());
145        match T::from_request(request, &this).await {
146            Ok(res) => Ok(res),
147            Err(err) => {
148                let resp = err.into_response();
149                Err(ServerFnError::from_axum_response(resp).await)
150            }
151        }
152    }
153
154    /// Get the current `FullstackContext` if it exists. This will return `None` if called on the client
155    /// or outside of a streaming response on the server or server function.
156    pub fn current() -> Option<Self> {
157        // Try to get the context from the task local (for server functions)
158        if let Ok(context) = FULLSTACK_CONTEXT.try_get() {
159            return Some(context);
160        }
161
162        // Otherwise, try to get it from the dioxus runtime context (for streaming SSR)
163        if let Some(rt) = dioxus_core::Runtime::try_current() {
164            let id = rt.try_current_scope_id()?;
165            if let Some(ctx) = rt.consume_context::<FullstackContext>(id) {
166                return Some(ctx);
167            }
168        }
169
170        None
171    }
172
173    /// Get the current HTTP status for the route. This will default to 200 OK, but can be modified
174    /// by calling `FullstackContext::commit_error_status` with an error.
175    pub fn current_http_status(&self) -> HttpError {
176        let mut lock = self.lock.write();
177        // Register the current reactive context as a subscriber to changes in the http status
178        if let Some(ctx) = ReactiveContext::current() {
179            lock.route_http_status_subscribers.insert(ctx);
180        }
181        lock.route_http_status.clone()
182    }
183
184    pub fn set_current_http_status(&mut self, status: HttpError) {
185        let mut lock = self.lock.write();
186        lock.route_http_status = status;
187        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
188        #[allow(clippy::mutable_key_type)]
189        let subscribers = std::mem::take(&mut lock.route_http_status_subscribers);
190        for subscriber in subscribers {
191            subscriber.mark_dirty();
192        }
193    }
194
195    /// Add a header to the response. This will be sent to the client when the response is committed.
196    pub fn add_response_header(
197        &self,
198        key: impl Into<http::header::HeaderName>,
199        value: impl Into<http::header::HeaderValue>,
200    ) {
201        let mut lock = self.lock.write();
202        if let Some(headers) = lock.response_headers.as_mut() {
203            headers.insert(key.into(), value.into());
204        }
205    }
206
207    /// Take the response headers out of the context. This will leave the context without any headers,
208    /// so it should only be called once when the response is being committed.
209    pub fn take_response_headers(&self) -> Option<HeaderMap> {
210        let mut lock = self.lock.write();
211        lock.response_headers.take()
212    }
213
214    /// Set the current HTTP status for the route. This will be used when committing the response
215    /// to the client.
216    pub fn commit_http_status(status: StatusCode, message: Option<String>) {
217        if let Some(mut ctx) = Self::current() {
218            ctx.set_current_http_status(HttpError { status, message });
219        }
220    }
221
222    /// Commit the CapturedError as the current HTTP status for the route.
223    /// This will attempt to downcast the error to known types and set the appropriate
224    /// status code. If the error type is unknown, it will default to
225    /// `StatusCode::INTERNAL_SERVER_ERROR`.
226    pub fn commit_error_status(error: impl Into<CapturedError>) -> HttpError {
227        let error = error.into();
228        let status = status_code_from_error(&error);
229        let http_error = HttpError {
230            status,
231            message: Some(error.to_string()),
232        };
233
234        if let Some(mut ctx) = Self::current() {
235            ctx.set_current_http_status(http_error.clone());
236        }
237
238        http_error
239    }
240}
241
242/// The status of the streaming response
243#[derive(Clone, Copy, Debug, PartialEq)]
244pub enum StreamingStatus {
245    /// The initial chunk is still being rendered. The http response parts can still be modified at this point.
246    RenderingInitialChunk,
247
248    /// The initial chunk has been committed and the response is now streaming. The http response parts
249    /// have already been sent to the client and can no longer be modified.
250    InitialChunkCommitted,
251}
252
253/// Commit the initial chunk of the response. This will be called automatically if you are using the
254/// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
255/// to call this manually to start the streaming part of the response.
256///
257/// On the client, this will do nothing.
258///
259/// # Example
260/// ```rust, no_run
261/// # use dioxus::prelude::*;
262/// # use dioxus_fullstack_core::*;
263/// # fn Children() -> Element { unimplemented!() }
264/// fn App() -> Element {
265///     // This will start streaming immediately after the current render is complete.
266///     use_hook(commit_initial_chunk);
267///
268///     rsx! { Children {} }
269/// }
270/// ```
271pub fn commit_initial_chunk() {
272    crate::history::finalize_route();
273    if let Some(mut streaming) = FullstackContext::current() {
274        streaming.commit_initial_chunk();
275    }
276}
277
278/// Extract an axum extractor from the current request.
279#[deprecated(note = "Use FullstackContext::extract instead", since = "0.7.0")]
280pub fn extract<T: FromRequest<FullstackContext, M>, M>(
281) -> impl std::future::Future<Output = Result<T, ServerFnError>> {
282    FullstackContext::extract::<T, M>()
283}
284
285/// Get the current status of the streaming response. This method is reactive and will cause
286/// the current reactive context to rerun when the status changes.
287///
288/// On the client, this will always return `StreamingStatus::InitialChunkCommitted`.
289///
290/// # Example
291/// ```rust, no_run
292/// # use dioxus::prelude::*;
293/// # use dioxus_fullstack_core::*;
294/// #[component]
295/// fn MetaTitle(title: String) -> Element {
296///     // If streaming has already started, warn the user that the meta tag will not show
297///     // up in the initial chunk.
298///     use_hook(|| {
299///         if current_status() == StreamingStatus::InitialChunkCommitted {
300///            dioxus::logger::tracing::warn!("Since `MetaTitle` was rendered after the initial chunk was committed, the meta tag will not show up in the head without javascript enabled.");
301///         }
302///     });
303///
304///     rsx! { meta { property: "og:title", content: title } }
305/// }
306/// ```
307pub fn current_status() -> StreamingStatus {
308    if let Some(streaming) = FullstackContext::current() {
309        streaming.streaming_state()
310    } else {
311        StreamingStatus::InitialChunkCommitted
312    }
313}
314
315/// Convert a `CapturedError` into an appropriate HTTP status code.
316///
317/// This will attempt to downcast the error to known types and return a corresponding status code.
318/// If the error type is unknown, it will default to `StatusCode::INTERNAL_SERVER_ERROR`.
319pub fn status_code_from_error(error: &CapturedError) -> StatusCode {
320    if let Some(err) = error.downcast_ref::<ServerFnError>() {
321        match err {
322            ServerFnError::ServerError { code, .. } => {
323                return StatusCode::from_u16(*code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
324            }
325            _ => return StatusCode::INTERNAL_SERVER_ERROR,
326        }
327    }
328
329    if let Some(err) = error.downcast_ref::<StatusCode>() {
330        return *err;
331    }
332
333    if let Some(err) = error.downcast_ref::<HttpError>() {
334        return err.status;
335    }
336
337    StatusCode::INTERNAL_SERVER_ERROR
338}