dioxus_fullstack_core/
streaming.rs

1use crate::{HttpError, ServerFnError};
2use axum_core::extract::FromRequest;
3use axum_core::response::IntoResponse;
4use dioxus_core::{CapturedError, ReactiveContext};
5use http::StatusCode;
6use http::{request::Parts, HeaderMap};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12/// The context provided by dioxus fullstack for server-side rendering.
13///
14/// This context will only be set on the server during the initial streaming response
15/// and inside server functions.
16#[derive(Clone, Debug)]
17pub struct FullstackContext {
18    // We expose the lock for request headers directly so it needs to be in a separate lock
19    request_headers: Arc<RwLock<http::request::Parts>>,
20
21    // The rest of the fields are only held internally, so we can group them together
22    lock: Arc<RwLock<FullstackContextInner>>,
23}
24
25// `FullstackContext` is always set when either
26// 1. rendering the app via SSR
27// 2. handling a server function request
28tokio::task_local! {
29    static FULLSTACK_CONTEXT: FullstackContext;
30}
31
32pub struct FullstackContextInner {
33    current_status: StreamingStatus,
34    current_status_subscribers: HashSet<ReactiveContext>,
35    response_headers: Option<HeaderMap>,
36    route_http_status: HttpError,
37    route_http_status_subscribers: HashSet<ReactiveContext>,
38}
39
40impl Debug for FullstackContextInner {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("FullstackContextInner")
43            .field("current_status", &self.current_status)
44            .field("response_headers", &self.response_headers)
45            .field("route_http_status", &self.route_http_status)
46            .finish()
47    }
48}
49
50impl PartialEq for FullstackContext {
51    fn eq(&self, other: &Self) -> bool {
52        Arc::ptr_eq(&self.lock, &other.lock)
53            && Arc::ptr_eq(&self.request_headers, &other.request_headers)
54    }
55}
56
57impl FullstackContext {
58    /// Create a new streaming context. You should not need to call this directly. Dioxus fullstack will
59    /// provide this context for you.
60    pub fn new(parts: Parts) -> Self {
61        Self {
62            request_headers: RwLock::new(parts).into(),
63            lock: RwLock::new(FullstackContextInner {
64                current_status: StreamingStatus::RenderingInitialChunk,
65                current_status_subscribers: Default::default(),
66                route_http_status: HttpError {
67                    status: http::StatusCode::OK,
68                    message: None,
69                },
70                route_http_status_subscribers: Default::default(),
71                response_headers: Some(HeaderMap::new()),
72            })
73            .into(),
74        }
75    }
76
77    /// Commit the initial chunk of the response. This will be called automatically if you are using the
78    /// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
79    /// to call this manually to start the streaming part of the response.
80    ///
81    /// Once this method has been called, the http response parts can no longer be modified.
82    pub fn commit_initial_chunk(&mut self) {
83        let mut lock = self.lock.write();
84        lock.current_status = StreamingStatus::InitialChunkCommitted;
85
86        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
87        #[allow(clippy::mutable_key_type)]
88        let subscribers = std::mem::take(&mut lock.current_status_subscribers);
89        for subscriber in subscribers {
90            subscriber.mark_dirty();
91        }
92    }
93
94    /// Get the current status of the streaming response. This method is reactive and will cause
95    /// the current reactive context to rerun when the status changes.
96    pub fn streaming_state(&self) -> StreamingStatus {
97        let mut lock = self.lock.write();
98        // Register the current reactive context as a subscriber to changes in the streaming status
99        if let Some(ctx) = ReactiveContext::current() {
100            lock.current_status_subscribers.insert(ctx);
101        }
102        lock.current_status
103    }
104
105    /// Access the http request parts mutably. This will allow you to modify headers and other parts of the request.
106    pub fn parts_mut(&self) -> parking_lot::RwLockWriteGuard<'_, http::request::Parts> {
107        self.request_headers.write()
108    }
109
110    /// Run a future within the scope of this FullstackContext.
111    pub async fn scope<F, R>(self, fut: F) -> R
112    where
113        F: std::future::Future<Output = R>,
114    {
115        FULLSTACK_CONTEXT.scope(self, fut).await
116    }
117
118    /// Extract an extension from the current request.
119    pub fn extension<T: Clone + Send + Sync + 'static>(&self) -> Option<T> {
120        let lock = self.request_headers.read();
121        lock.extensions.get::<T>().cloned()
122    }
123
124    /// Extract an axum extractor from the current request.
125    ///
126    /// The body of the request is always empty when using this method, as the body can only be consumed once in the server
127    /// function extractors.
128    pub async fn extract<T: FromRequest<Self, M>, M>() -> Result<T, ServerFnError> {
129        let this = Self::current()
130            .ok_or_else(|| ServerFnError::new("No FullstackContext found".to_string()))?;
131
132        let parts = this.request_headers.read().clone();
133        let request = axum_core::extract::Request::from_parts(parts, Default::default());
134        match T::from_request(request, &this).await {
135            Ok(res) => Ok(res),
136            Err(err) => {
137                let resp = err.into_response();
138                Err(ServerFnError::from_axum_response(resp).await)
139            }
140        }
141    }
142
143    /// Get the current `FullstackContext` if it exists. This will return `None` if called on the client
144    /// or outside of a streaming response on the server or server function.
145    pub fn current() -> Option<Self> {
146        // Try to get the context from the task local (for server functions)
147        if let Ok(context) = FULLSTACK_CONTEXT.try_get() {
148            return Some(context);
149        }
150
151        // Otherwise, try to get it from the dioxus runtime context (for streaming SSR)
152        if let Some(rt) = dioxus_core::Runtime::try_current() {
153            let id = rt.try_current_scope_id()?;
154            if let Some(ctx) = rt.consume_context::<FullstackContext>(id) {
155                return Some(ctx);
156            }
157        }
158
159        None
160    }
161
162    /// Get the current HTTP status for the route. This will default to 200 OK, but can be modified
163    /// by calling `FullstackContext::commit_error_status` with an error.
164    pub fn current_http_status(&self) -> HttpError {
165        let mut lock = self.lock.write();
166        // Register the current reactive context as a subscriber to changes in the http status
167        if let Some(ctx) = ReactiveContext::current() {
168            lock.route_http_status_subscribers.insert(ctx);
169        }
170        lock.route_http_status.clone()
171    }
172
173    pub fn set_current_http_status(&mut self, status: HttpError) {
174        let mut lock = self.lock.write();
175        lock.route_http_status = status;
176        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
177        #[allow(clippy::mutable_key_type)]
178        let subscribers = std::mem::take(&mut lock.route_http_status_subscribers);
179        for subscriber in subscribers {
180            subscriber.mark_dirty();
181        }
182    }
183
184    /// Add a header to the response. This will be sent to the client when the response is committed.
185    pub fn add_response_header(
186        &self,
187        key: impl Into<http::header::HeaderName>,
188        value: impl Into<http::header::HeaderValue>,
189    ) {
190        let mut lock = self.lock.write();
191        if let Some(headers) = lock.response_headers.as_mut() {
192            headers.insert(key.into(), value.into());
193        }
194    }
195
196    /// Take the response headers out of the context. This will leave the context without any headers,
197    /// so it should only be called once when the response is being committed.
198    pub fn take_response_headers(&self) -> Option<HeaderMap> {
199        let mut lock = self.lock.write();
200        lock.response_headers.take()
201    }
202
203    /// Set the current HTTP status for the route. This will be used when committing the response
204    /// to the client.
205    pub fn commit_http_status(status: StatusCode, message: Option<String>) {
206        if let Some(mut ctx) = Self::current() {
207            ctx.set_current_http_status(HttpError { status, message });
208        }
209    }
210
211    /// Commit the CapturedError as the current HTTP status for the route.
212    /// This will attempt to downcast the error to known types and set the appropriate
213    /// status code. If the error type is unknown, it will default to
214    /// `StatusCode::INTERNAL_SERVER_ERROR`.
215    pub fn commit_error_status(error: impl Into<CapturedError>) -> HttpError {
216        let error = error.into();
217        let status = status_code_from_error(&error);
218        let http_error = HttpError {
219            status,
220            message: Some(error.to_string()),
221        };
222
223        if let Some(mut ctx) = Self::current() {
224            ctx.set_current_http_status(http_error.clone());
225        }
226
227        http_error
228    }
229}
230
231/// The status of the streaming response
232#[derive(Clone, Copy, Debug, PartialEq)]
233pub enum StreamingStatus {
234    /// The initial chunk is still being rendered. The http response parts can still be modified at this point.
235    RenderingInitialChunk,
236
237    /// The initial chunk has been committed and the response is now streaming. The http response parts
238    /// have already been sent to the client and can no longer be modified.
239    InitialChunkCommitted,
240}
241
242/// Commit the initial chunk of the response. This will be called automatically if you are using the
243/// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
244/// to call this manually to start the streaming part of the response.
245///
246/// On the client, this will do nothing.
247///
248/// # Example
249/// ```rust, no_run
250/// # use dioxus::prelude::*;
251/// # use dioxus_fullstack_core::*;
252/// # fn Children() -> Element { unimplemented!() }
253/// fn App() -> Element {
254///     // This will start streaming immediately after the current render is complete.
255///     use_hook(commit_initial_chunk);
256///
257///     rsx! { Children {} }
258/// }
259/// ```
260pub fn commit_initial_chunk() {
261    crate::history::finalize_route();
262    if let Some(mut streaming) = FullstackContext::current() {
263        streaming.commit_initial_chunk();
264    }
265}
266
267/// Extract an axum extractor from the current request.
268#[deprecated(note = "Use FullstackContext::extract instead", since = "0.7.0")]
269pub fn extract<T: FromRequest<FullstackContext, M>, M>(
270) -> impl std::future::Future<Output = Result<T, ServerFnError>> {
271    FullstackContext::extract::<T, M>()
272}
273
274/// Get the current status of the streaming response. This method is reactive and will cause
275/// the current reactive context to rerun when the status changes.
276///
277/// On the client, this will always return `StreamingStatus::InitialChunkCommitted`.
278///
279/// # Example
280/// ```rust, no_run
281/// # use dioxus::prelude::*;
282/// # use dioxus_fullstack_core::*;
283/// #[component]
284/// fn MetaTitle(title: String) -> Element {
285///     // If streaming has already started, warn the user that the meta tag will not show
286///     // up in the initial chunk.
287///     use_hook(|| {
288///         if current_status() == StreamingStatus::InitialChunkCommitted {
289///            dioxus::logger::tracing::warn!("Since `MetaTitle` was rendered after the initial chunk was committed, the meta tag will not show up in the head without javascript enabled.");
290///         }
291///     });
292///
293///     rsx! { meta { property: "og:title", content: title } }
294/// }
295/// ```
296pub fn current_status() -> StreamingStatus {
297    if let Some(streaming) = FullstackContext::current() {
298        streaming.streaming_state()
299    } else {
300        StreamingStatus::InitialChunkCommitted
301    }
302}
303
304/// Convert a `CapturedError` into an appropriate HTTP status code.
305///
306/// This will attempt to downcast the error to known types and return a corresponding status code.
307/// If the error type is unknown, it will default to `StatusCode::INTERNAL_SERVER_ERROR`.
308pub fn status_code_from_error(error: &CapturedError) -> StatusCode {
309    if let Some(err) = error.downcast_ref::<ServerFnError>() {
310        match err {
311            ServerFnError::ServerError { code, .. } => {
312                return StatusCode::from_u16(*code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
313            }
314            _ => return StatusCode::INTERNAL_SERVER_ERROR,
315        }
316    }
317
318    if let Some(err) = error.downcast_ref::<StatusCode>() {
319        return *err;
320    }
321
322    if let Some(err) = error.downcast_ref::<HttpError>() {
323        return err.status;
324    }
325
326    StatusCode::INTERNAL_SERVER_ERROR
327}