dioxus_fullstack_core/streaming.rs
1use crate::{HttpError, ServerFnError};
2use axum_core::extract::FromRequest;
3use axum_core::response::IntoResponse;
4use dioxus_core::{CapturedError, ReactiveContext};
5use http::StatusCode;
6use http::{request::Parts, HeaderMap};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12/// The context provided by dioxus fullstack for server-side rendering.
13///
14/// This context will only be set on the server during the initial streaming response
15/// and inside server functions.
16#[derive(Clone, Debug)]
17pub struct FullstackContext {
18 // We expose the lock for request headers directly so it needs to be in a separate lock
19 request_headers: Arc<RwLock<http::request::Parts>>,
20
21 // The rest of the fields are only held internally, so we can group them together
22 lock: Arc<RwLock<FullstackContextInner>>,
23}
24
25// `FullstackContext` is always set when either
26// 1. rendering the app via SSR
27// 2. handling a server function request
28tokio::task_local! {
29 static FULLSTACK_CONTEXT: FullstackContext;
30}
31
32pub struct FullstackContextInner {
33 current_status: StreamingStatus,
34 current_status_subscribers: HashSet<ReactiveContext>,
35 response_headers: Option<HeaderMap>,
36 route_http_status: HttpError,
37 route_http_status_subscribers: HashSet<ReactiveContext>,
38}
39
40impl Debug for FullstackContextInner {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("FullstackContextInner")
43 .field("current_status", &self.current_status)
44 .field("response_headers", &self.response_headers)
45 .field("route_http_status", &self.route_http_status)
46 .finish()
47 }
48}
49
50impl PartialEq for FullstackContext {
51 fn eq(&self, other: &Self) -> bool {
52 Arc::ptr_eq(&self.lock, &other.lock)
53 && Arc::ptr_eq(&self.request_headers, &other.request_headers)
54 }
55}
56
57impl FullstackContext {
58 /// Create a new streaming context. You should not need to call this directly. Dioxus fullstack will
59 /// provide this context for you.
60 pub fn new(parts: Parts) -> Self {
61 Self {
62 request_headers: RwLock::new(parts).into(),
63 lock: RwLock::new(FullstackContextInner {
64 current_status: StreamingStatus::RenderingInitialChunk,
65 current_status_subscribers: Default::default(),
66 route_http_status: HttpError {
67 status: http::StatusCode::OK,
68 message: None,
69 },
70 route_http_status_subscribers: Default::default(),
71 response_headers: Some(HeaderMap::new()),
72 })
73 .into(),
74 }
75 }
76
77 /// Commit the initial chunk of the response. This will be called automatically if you are using the
78 /// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
79 /// to call this manually to start the streaming part of the response.
80 ///
81 /// Once this method has been called, the http response parts can no longer be modified.
82 pub fn commit_initial_chunk(&mut self) {
83 let mut lock = self.lock.write();
84 lock.current_status = StreamingStatus::InitialChunkCommitted;
85
86 // The key type is mutable, but the hash is stable through mutations because we hash by pointer
87 #[allow(clippy::mutable_key_type)]
88 let subscribers = std::mem::take(&mut lock.current_status_subscribers);
89 for subscriber in subscribers {
90 subscriber.mark_dirty();
91 }
92 }
93
94 /// Get the current status of the streaming response. This method is reactive and will cause
95 /// the current reactive context to rerun when the status changes.
96 pub fn streaming_state(&self) -> StreamingStatus {
97 let mut lock = self.lock.write();
98 // Register the current reactive context as a subscriber to changes in the streaming status
99 if let Some(ctx) = ReactiveContext::current() {
100 lock.current_status_subscribers.insert(ctx);
101 }
102 lock.current_status
103 }
104
105 /// Access the http request parts mutably. This will allow you to modify headers and other parts of the request.
106 pub fn parts_mut(&self) -> parking_lot::RwLockWriteGuard<'_, http::request::Parts> {
107 self.request_headers.write()
108 }
109
110 /// Run a future within the scope of this FullstackContext.
111 pub async fn scope<F, R>(self, fut: F) -> R
112 where
113 F: std::future::Future<Output = R>,
114 {
115 FULLSTACK_CONTEXT.scope(self, fut).await
116 }
117
118 /// Extract an extension from the current request.
119 pub fn extension<T: Clone + Send + Sync + 'static>(&self) -> Option<T> {
120 let lock = self.request_headers.read();
121 lock.extensions.get::<T>().cloned()
122 }
123
124 /// Extract an axum extractor from the current request.
125 ///
126 /// The body of the request is always empty when using this method, as the body can only be consumed once in the server
127 /// function extractors.
128 pub async fn extract<T: FromRequest<Self, M>, M>() -> Result<T, ServerFnError> {
129 let this = Self::current().unwrap_or_else(|| {
130 // Create a dummy context if one doesn't exist, making the function usable outside of a request context
131 FullstackContext::new(
132 axum_core::extract::Request::builder()
133 .method("GET")
134 .uri("/")
135 .header("X-Dummy-Header", "true")
136 .body(())
137 .unwrap()
138 .into_parts()
139 .0,
140 )
141 });
142
143 let parts = this.request_headers.read().clone();
144 let request = axum_core::extract::Request::from_parts(parts, Default::default());
145 match T::from_request(request, &this).await {
146 Ok(res) => Ok(res),
147 Err(err) => {
148 let resp = err.into_response();
149 Err(ServerFnError::from_axum_response(resp).await)
150 }
151 }
152 }
153
154 /// Get the current `FullstackContext` if it exists. This will return `None` if called on the client
155 /// or outside of a streaming response on the server or server function.
156 pub fn current() -> Option<Self> {
157 // Try to get the context from the task local (for server functions)
158 if let Ok(context) = FULLSTACK_CONTEXT.try_get() {
159 return Some(context);
160 }
161
162 // Otherwise, try to get it from the dioxus runtime context (for streaming SSR)
163 if let Some(rt) = dioxus_core::Runtime::try_current() {
164 let id = rt.try_current_scope_id()?;
165 if let Some(ctx) = rt.consume_context::<FullstackContext>(id) {
166 return Some(ctx);
167 }
168 }
169
170 None
171 }
172
173 /// Get the current HTTP status for the route. This will default to 200 OK, but can be modified
174 /// by calling `FullstackContext::commit_error_status` with an error.
175 pub fn current_http_status(&self) -> HttpError {
176 let mut lock = self.lock.write();
177 // Register the current reactive context as a subscriber to changes in the http status
178 if let Some(ctx) = ReactiveContext::current() {
179 lock.route_http_status_subscribers.insert(ctx);
180 }
181 lock.route_http_status.clone()
182 }
183
184 pub fn set_current_http_status(&mut self, status: HttpError) {
185 let mut lock = self.lock.write();
186 lock.route_http_status = status;
187 // The key type is mutable, but the hash is stable through mutations because we hash by pointer
188 #[allow(clippy::mutable_key_type)]
189 let subscribers = std::mem::take(&mut lock.route_http_status_subscribers);
190 for subscriber in subscribers {
191 subscriber.mark_dirty();
192 }
193 }
194
195 /// Add a header to the response. This will be sent to the client when the response is committed.
196 pub fn add_response_header(
197 &self,
198 key: impl Into<http::header::HeaderName>,
199 value: impl Into<http::header::HeaderValue>,
200 ) {
201 let mut lock = self.lock.write();
202 if let Some(headers) = lock.response_headers.as_mut() {
203 headers.insert(key.into(), value.into());
204 }
205 }
206
207 /// Take the response headers out of the context. This will leave the context without any headers,
208 /// so it should only be called once when the response is being committed.
209 pub fn take_response_headers(&self) -> Option<HeaderMap> {
210 let mut lock = self.lock.write();
211 lock.response_headers.take()
212 }
213
214 /// Set the current HTTP status for the route. This will be used when committing the response
215 /// to the client.
216 pub fn commit_http_status(status: StatusCode, message: Option<String>) {
217 if let Some(mut ctx) = Self::current() {
218 ctx.set_current_http_status(HttpError { status, message });
219 }
220 }
221
222 /// Commit the CapturedError as the current HTTP status for the route.
223 /// This will attempt to downcast the error to known types and set the appropriate
224 /// status code. If the error type is unknown, it will default to
225 /// `StatusCode::INTERNAL_SERVER_ERROR`.
226 pub fn commit_error_status(error: impl Into<CapturedError>) -> HttpError {
227 let error = error.into();
228 let status = status_code_from_error(&error);
229 let http_error = HttpError {
230 status,
231 message: Some(error.to_string()),
232 };
233
234 if let Some(mut ctx) = Self::current() {
235 ctx.set_current_http_status(http_error.clone());
236 }
237
238 http_error
239 }
240}
241
242/// The status of the streaming response
243#[derive(Clone, Copy, Debug, PartialEq)]
244pub enum StreamingStatus {
245 /// The initial chunk is still being rendered. The http response parts can still be modified at this point.
246 RenderingInitialChunk,
247
248 /// The initial chunk has been committed and the response is now streaming. The http response parts
249 /// have already been sent to the client and can no longer be modified.
250 InitialChunkCommitted,
251}
252
253/// Commit the initial chunk of the response. This will be called automatically if you are using the
254/// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
255/// to call this manually to start the streaming part of the response.
256///
257/// On the client, this will do nothing.
258///
259/// # Example
260/// ```rust, no_run
261/// # use dioxus::prelude::*;
262/// # use dioxus_fullstack_core::*;
263/// # fn Children() -> Element { unimplemented!() }
264/// fn App() -> Element {
265/// // This will start streaming immediately after the current render is complete.
266/// use_hook(commit_initial_chunk);
267///
268/// rsx! { Children {} }
269/// }
270/// ```
271pub fn commit_initial_chunk() {
272 crate::history::finalize_route();
273 if let Some(mut streaming) = FullstackContext::current() {
274 streaming.commit_initial_chunk();
275 }
276}
277
278/// Extract an axum extractor from the current request.
279#[deprecated(note = "Use FullstackContext::extract instead", since = "0.7.0")]
280pub fn extract<T: FromRequest<FullstackContext, M>, M>(
281) -> impl std::future::Future<Output = Result<T, ServerFnError>> {
282 FullstackContext::extract::<T, M>()
283}
284
285/// Get the current status of the streaming response. This method is reactive and will cause
286/// the current reactive context to rerun when the status changes.
287///
288/// On the client, this will always return `StreamingStatus::InitialChunkCommitted`.
289///
290/// # Example
291/// ```rust, no_run
292/// # use dioxus::prelude::*;
293/// # use dioxus_fullstack_core::*;
294/// #[component]
295/// fn MetaTitle(title: String) -> Element {
296/// // If streaming has already started, warn the user that the meta tag will not show
297/// // up in the initial chunk.
298/// use_hook(|| {
299/// if current_status() == StreamingStatus::InitialChunkCommitted {
300/// dioxus::logger::tracing::warn!("Since `MetaTitle` was rendered after the initial chunk was committed, the meta tag will not show up in the head without javascript enabled.");
301/// }
302/// });
303///
304/// rsx! { meta { property: "og:title", content: title } }
305/// }
306/// ```
307pub fn current_status() -> StreamingStatus {
308 if let Some(streaming) = FullstackContext::current() {
309 streaming.streaming_state()
310 } else {
311 StreamingStatus::InitialChunkCommitted
312 }
313}
314
315/// Convert a `CapturedError` into an appropriate HTTP status code.
316///
317/// This will attempt to downcast the error to known types and return a corresponding status code.
318/// If the error type is unknown, it will default to `StatusCode::INTERNAL_SERVER_ERROR`.
319pub fn status_code_from_error(error: &CapturedError) -> StatusCode {
320 if let Some(err) = error.downcast_ref::<ServerFnError>() {
321 match err {
322 ServerFnError::ServerError { code, .. } => {
323 return StatusCode::from_u16(*code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
324 }
325 _ => return StatusCode::INTERNAL_SERVER_ERROR,
326 }
327 }
328
329 if let Some(err) = error.downcast_ref::<StatusCode>() {
330 return *err;
331 }
332
333 if let Some(err) = error.downcast_ref::<HttpError>() {
334 return err.status;
335 }
336
337 StatusCode::INTERNAL_SERVER_ERROR
338}