dioxus_fullstack_core/streaming.rs
1use crate::{HttpError, ServerFnError};
2use axum_core::extract::FromRequest;
3use axum_core::response::IntoResponse;
4use dioxus_core::{CapturedError, ReactiveContext};
5use http::StatusCode;
6use http::{request::Parts, HeaderMap};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12/// The context provided by dioxus fullstack for server-side rendering.
13///
14/// This context will only be set on the server during the initial streaming response
15/// and inside server functions.
16#[derive(Clone, Debug)]
17pub struct FullstackContext {
18 // We expose the lock for request headers directly so it needs to be in a separate lock
19 request_headers: Arc<RwLock<http::request::Parts>>,
20
21 // The rest of the fields are only held internally, so we can group them together
22 lock: Arc<RwLock<FullstackContextInner>>,
23}
24
25// `FullstackContext` is always set when either
26// 1. rendering the app via SSR
27// 2. handling a server function request
28tokio::task_local! {
29 static FULLSTACK_CONTEXT: FullstackContext;
30}
31
32pub struct FullstackContextInner {
33 current_status: StreamingStatus,
34 current_status_subscribers: HashSet<ReactiveContext>,
35 response_headers: Option<HeaderMap>,
36 route_http_status: HttpError,
37 route_http_status_subscribers: HashSet<ReactiveContext>,
38}
39
40impl Debug for FullstackContextInner {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("FullstackContextInner")
43 .field("current_status", &self.current_status)
44 .field("response_headers", &self.response_headers)
45 .field("route_http_status", &self.route_http_status)
46 .finish()
47 }
48}
49
50impl PartialEq for FullstackContext {
51 fn eq(&self, other: &Self) -> bool {
52 Arc::ptr_eq(&self.lock, &other.lock)
53 && Arc::ptr_eq(&self.request_headers, &other.request_headers)
54 }
55}
56
57impl FullstackContext {
58 /// Create a new streaming context. You should not need to call this directly. Dioxus fullstack will
59 /// provide this context for you.
60 pub fn new(parts: Parts) -> Self {
61 Self {
62 request_headers: RwLock::new(parts).into(),
63 lock: RwLock::new(FullstackContextInner {
64 current_status: StreamingStatus::RenderingInitialChunk,
65 current_status_subscribers: Default::default(),
66 route_http_status: HttpError {
67 status: http::StatusCode::OK,
68 message: None,
69 },
70 route_http_status_subscribers: Default::default(),
71 response_headers: Some(HeaderMap::new()),
72 })
73 .into(),
74 }
75 }
76
77 /// Commit the initial chunk of the response. This will be called automatically if you are using the
78 /// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
79 /// to call this manually to start the streaming part of the response.
80 ///
81 /// Once this method has been called, the http response parts can no longer be modified.
82 pub fn commit_initial_chunk(&mut self) {
83 let mut lock = self.lock.write();
84 lock.current_status = StreamingStatus::InitialChunkCommitted;
85
86 // The key type is mutable, but the hash is stable through mutations because we hash by pointer
87 #[allow(clippy::mutable_key_type)]
88 let subscribers = std::mem::take(&mut lock.current_status_subscribers);
89 for subscriber in subscribers {
90 subscriber.mark_dirty();
91 }
92 }
93
94 /// Get the current status of the streaming response. This method is reactive and will cause
95 /// the current reactive context to rerun when the status changes.
96 pub fn streaming_state(&self) -> StreamingStatus {
97 let mut lock = self.lock.write();
98 // Register the current reactive context as a subscriber to changes in the streaming status
99 if let Some(ctx) = ReactiveContext::current() {
100 lock.current_status_subscribers.insert(ctx);
101 }
102 lock.current_status
103 }
104
105 /// Access the http request parts mutably. This will allow you to modify headers and other parts of the request.
106 pub fn parts_mut(&self) -> parking_lot::RwLockWriteGuard<'_, http::request::Parts> {
107 self.request_headers.write()
108 }
109
110 /// Run a future within the scope of this FullstackContext.
111 pub async fn scope<F, R>(self, fut: F) -> R
112 where
113 F: std::future::Future<Output = R>,
114 {
115 FULLSTACK_CONTEXT.scope(self, fut).await
116 }
117
118 /// Extract an extension from the current request.
119 pub fn extension<T: Clone + Send + Sync + 'static>(&self) -> Option<T> {
120 let lock = self.request_headers.read();
121 lock.extensions.get::<T>().cloned()
122 }
123
124 /// Extract an axum extractor from the current request.
125 ///
126 /// The body of the request is always empty when using this method, as the body can only be consumed once in the server
127 /// function extractors.
128 pub async fn extract<T: FromRequest<Self, M>, M>() -> Result<T, ServerFnError> {
129 let this = Self::current()
130 .ok_or_else(|| ServerFnError::new("No FullstackContext found".to_string()))?;
131
132 let parts = this.request_headers.read().clone();
133 let request = axum_core::extract::Request::from_parts(parts, Default::default());
134 match T::from_request(request, &this).await {
135 Ok(res) => Ok(res),
136 Err(err) => {
137 let resp = err.into_response();
138 Err(ServerFnError::from_axum_response(resp).await)
139 }
140 }
141 }
142
143 /// Get the current `FullstackContext` if it exists. This will return `None` if called on the client
144 /// or outside of a streaming response on the server or server function.
145 pub fn current() -> Option<Self> {
146 // Try to get the context from the task local (for server functions)
147 if let Ok(context) = FULLSTACK_CONTEXT.try_get() {
148 return Some(context);
149 }
150
151 // Otherwise, try to get it from the dioxus runtime context (for streaming SSR)
152 if let Some(rt) = dioxus_core::Runtime::try_current() {
153 let id = rt.try_current_scope_id()?;
154 if let Some(ctx) = rt.consume_context::<FullstackContext>(id) {
155 return Some(ctx);
156 }
157 }
158
159 None
160 }
161
162 /// Get the current HTTP status for the route. This will default to 200 OK, but can be modified
163 /// by calling `FullstackContext::commit_error_status` with an error.
164 pub fn current_http_status(&self) -> HttpError {
165 let mut lock = self.lock.write();
166 // Register the current reactive context as a subscriber to changes in the http status
167 if let Some(ctx) = ReactiveContext::current() {
168 lock.route_http_status_subscribers.insert(ctx);
169 }
170 lock.route_http_status.clone()
171 }
172
173 pub fn set_current_http_status(&mut self, status: HttpError) {
174 let mut lock = self.lock.write();
175 lock.route_http_status = status;
176 // The key type is mutable, but the hash is stable through mutations because we hash by pointer
177 #[allow(clippy::mutable_key_type)]
178 let subscribers = std::mem::take(&mut lock.route_http_status_subscribers);
179 for subscriber in subscribers {
180 subscriber.mark_dirty();
181 }
182 }
183
184 /// Add a header to the response. This will be sent to the client when the response is committed.
185 pub fn add_response_header(
186 &self,
187 key: impl Into<http::header::HeaderName>,
188 value: impl Into<http::header::HeaderValue>,
189 ) {
190 let mut lock = self.lock.write();
191 if let Some(headers) = lock.response_headers.as_mut() {
192 headers.insert(key.into(), value.into());
193 }
194 }
195
196 /// Take the response headers out of the context. This will leave the context without any headers,
197 /// so it should only be called once when the response is being committed.
198 pub fn take_response_headers(&self) -> Option<HeaderMap> {
199 let mut lock = self.lock.write();
200 lock.response_headers.take()
201 }
202
203 /// Set the current HTTP status for the route. This will be used when committing the response
204 /// to the client.
205 pub fn commit_http_status(status: StatusCode, message: Option<String>) {
206 if let Some(mut ctx) = Self::current() {
207 ctx.set_current_http_status(HttpError { status, message });
208 }
209 }
210
211 /// Commit the CapturedError as the current HTTP status for the route.
212 /// This will attempt to downcast the error to known types and set the appropriate
213 /// status code. If the error type is unknown, it will default to
214 /// `StatusCode::INTERNAL_SERVER_ERROR`.
215 pub fn commit_error_status(error: impl Into<CapturedError>) -> HttpError {
216 let error = error.into();
217 let status = status_code_from_error(&error);
218 let http_error = HttpError {
219 status,
220 message: Some(error.to_string()),
221 };
222
223 if let Some(mut ctx) = Self::current() {
224 ctx.set_current_http_status(http_error.clone());
225 }
226
227 http_error
228 }
229}
230
231/// The status of the streaming response
232#[derive(Clone, Copy, Debug, PartialEq)]
233pub enum StreamingStatus {
234 /// The initial chunk is still being rendered. The http response parts can still be modified at this point.
235 RenderingInitialChunk,
236
237 /// The initial chunk has been committed and the response is now streaming. The http response parts
238 /// have already been sent to the client and can no longer be modified.
239 InitialChunkCommitted,
240}
241
242/// Commit the initial chunk of the response. This will be called automatically if you are using the
243/// dioxus router when the suspense boundary above the router is resolved. Otherwise, you will need
244/// to call this manually to start the streaming part of the response.
245///
246/// On the client, this will do nothing.
247///
248/// # Example
249/// ```rust, no_run
250/// # use dioxus::prelude::*;
251/// # use dioxus_fullstack_core::*;
252/// # fn Children() -> Element { unimplemented!() }
253/// fn App() -> Element {
254/// // This will start streaming immediately after the current render is complete.
255/// use_hook(commit_initial_chunk);
256///
257/// rsx! { Children {} }
258/// }
259/// ```
260pub fn commit_initial_chunk() {
261 crate::history::finalize_route();
262 if let Some(mut streaming) = FullstackContext::current() {
263 streaming.commit_initial_chunk();
264 }
265}
266
267/// Extract an axum extractor from the current request.
268#[deprecated(note = "Use FullstackContext::extract instead", since = "0.7.0")]
269pub fn extract<T: FromRequest<FullstackContext, M>, M>(
270) -> impl std::future::Future<Output = Result<T, ServerFnError>> {
271 FullstackContext::extract::<T, M>()
272}
273
274/// Get the current status of the streaming response. This method is reactive and will cause
275/// the current reactive context to rerun when the status changes.
276///
277/// On the client, this will always return `StreamingStatus::InitialChunkCommitted`.
278///
279/// # Example
280/// ```rust, no_run
281/// # use dioxus::prelude::*;
282/// # use dioxus_fullstack_core::*;
283/// #[component]
284/// fn MetaTitle(title: String) -> Element {
285/// // If streaming has already started, warn the user that the meta tag will not show
286/// // up in the initial chunk.
287/// use_hook(|| {
288/// if current_status() == StreamingStatus::InitialChunkCommitted {
289/// dioxus::logger::tracing::warn!("Since `MetaTitle` was rendered after the initial chunk was committed, the meta tag will not show up in the head without javascript enabled.");
290/// }
291/// });
292///
293/// rsx! { meta { property: "og:title", content: title } }
294/// }
295/// ```
296pub fn current_status() -> StreamingStatus {
297 if let Some(streaming) = FullstackContext::current() {
298 streaming.streaming_state()
299 } else {
300 StreamingStatus::InitialChunkCommitted
301 }
302}
303
304/// Convert a `CapturedError` into an appropriate HTTP status code.
305///
306/// This will attempt to downcast the error to known types and return a corresponding status code.
307/// If the error type is unknown, it will default to `StatusCode::INTERNAL_SERVER_ERROR`.
308pub fn status_code_from_error(error: &CapturedError) -> StatusCode {
309 if let Some(err) = error.downcast_ref::<ServerFnError>() {
310 match err {
311 ServerFnError::ServerError { code, .. } => {
312 return StatusCode::from_u16(*code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
313 }
314 _ => return StatusCode::INTERNAL_SERVER_ERROR,
315 }
316 }
317
318 if let Some(err) = error.downcast_ref::<StatusCode>() {
319 return *err;
320 }
321
322 if let Some(err) = error.downcast_ref::<HttpError>() {
323 return err.status;
324 }
325
326 StatusCode::INTERNAL_SERVER_ERROR
327}