hydration_context/
ssr.rs

1use super::{SerializedDataId, SharedContext};
2use crate::{PinnedFuture, PinnedStream};
3use futures::{
4    future::join_all,
5    stream::{self, once},
6    Stream, StreamExt,
7};
8use or_poisoned::OrPoisoned;
9use std::{
10    collections::HashSet,
11    fmt::{Debug, Write},
12    mem,
13    pin::Pin,
14    sync::{
15        atomic::{AtomicBool, AtomicUsize, Ordering},
16        Arc, Mutex, RwLock,
17    },
18    task::{Context, Poll},
19};
20use throw_error::{Error, ErrorId};
21
22type AsyncDataBuf = Arc<RwLock<Vec<(SerializedDataId, PinnedFuture<String>)>>>;
23type ErrorBuf = Arc<RwLock<Vec<(SerializedDataId, ErrorId, Error)>>>;
24type SealedErrors = Arc<RwLock<HashSet<SerializedDataId>>>;
25
26#[derive(Default)]
27/// The shared context that should be used on the server side.
28pub struct SsrSharedContext {
29    id: AtomicUsize,
30    non_hydration_id: AtomicUsize,
31    is_hydrating: AtomicBool,
32    sync_buf: RwLock<Vec<ResolvedData>>,
33    async_buf: AsyncDataBuf,
34    errors: ErrorBuf,
35    sealed_error_boundaries: SealedErrors,
36    deferred: Mutex<Vec<PinnedFuture<()>>>,
37    incomplete: Arc<Mutex<Vec<SerializedDataId>>>,
38}
39
40impl SsrSharedContext {
41    /// Creates a new shared context for rendering HTML on the server.
42    pub fn new() -> Self {
43        Self {
44            is_hydrating: AtomicBool::new(true),
45            non_hydration_id: AtomicUsize::new(usize::MAX),
46            ..Default::default()
47        }
48    }
49
50    /// Creates a new shared context for rendering HTML on the server in "islands" mode.
51    ///
52    /// This defaults to a mode in which the app is not hydrated, but allows you to opt into
53    /// hydration for certain portions using [`SharedContext::set_is_hydrating`].
54    pub fn new_islands() -> Self {
55        Self {
56            is_hydrating: AtomicBool::new(false),
57            non_hydration_id: AtomicUsize::new(usize::MAX),
58            ..Default::default()
59        }
60    }
61
62    /// Consume the data buffers, awaiting all async resources,
63    /// returning both sync and async buffers.
64    /// Useful to implement custom hydration contexts.
65    ///
66    /// WARNING: this will clear the internal buffers, it should only be called once.
67    /// A second call would return an empty `vec![]`.
68    pub async fn consume_buffers(&self) -> Vec<(SerializedDataId, String)> {
69        let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
70        let async_data = mem::take(&mut *self.async_buf.write().or_poisoned());
71
72        let mut all_data = Vec::new();
73        for resolved in sync_data {
74            all_data.push((resolved.0, resolved.1));
75        }
76        for (id, fut) in async_data {
77            let data = fut.await;
78            all_data.push((id, data));
79        }
80        all_data
81    }
82}
83
84impl Debug for SsrSharedContext {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("SsrSharedContext")
87            .field("id", &self.id)
88            .field("is_hydrating", &self.is_hydrating)
89            .field("sync_buf", &self.sync_buf)
90            .field("async_buf", &self.async_buf.read().or_poisoned().len())
91            .finish()
92    }
93}
94
95impl SharedContext for SsrSharedContext {
96    fn is_browser(&self) -> bool {
97        false
98    }
99
100    #[track_caller]
101    fn next_id(&self) -> SerializedDataId {
102        let id = if self.get_is_hydrating() {
103            self.id.fetch_add(1, Ordering::Relaxed)
104        } else {
105            self.non_hydration_id.fetch_sub(1, Ordering::Relaxed)
106        };
107        SerializedDataId(id)
108    }
109
110    fn write_async(&self, id: SerializedDataId, fut: PinnedFuture<String>) {
111        self.async_buf.write().or_poisoned().push((id, fut))
112    }
113
114    fn read_data(&self, _id: &SerializedDataId) -> Option<String> {
115        None
116    }
117
118    fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
119        None
120    }
121
122    fn get_is_hydrating(&self) -> bool {
123        self.is_hydrating.load(Ordering::SeqCst)
124    }
125
126    fn set_is_hydrating(&self, is_hydrating: bool) {
127        self.is_hydrating.store(is_hydrating, Ordering::SeqCst)
128    }
129
130    fn errors(&self, boundary_id: &SerializedDataId) -> Vec<(ErrorId, Error)> {
131        self.errors
132            .read()
133            .or_poisoned()
134            .iter()
135            .filter_map(|(boundary, id, error)| {
136                if boundary == boundary_id {
137                    Some((id.clone(), error.clone()))
138                } else {
139                    None
140                }
141            })
142            .collect()
143    }
144
145    fn register_error(
146        &self,
147        error_boundary_id: SerializedDataId,
148        error_id: ErrorId,
149        error: Error,
150    ) {
151        self.errors.write().or_poisoned().push((
152            error_boundary_id,
153            error_id,
154            error,
155        ));
156    }
157
158    fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, Error)> {
159        mem::take(&mut *self.errors.write().or_poisoned())
160    }
161
162    fn seal_errors(&self, boundary_id: &SerializedDataId) {
163        self.sealed_error_boundaries
164            .write()
165            .or_poisoned()
166            .insert(boundary_id.clone());
167    }
168
169    fn pending_data(&self) -> Option<PinnedStream<String>> {
170        let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
171        let async_data = self.async_buf.read().or_poisoned();
172
173        // 1) initial, synchronous setup chunk
174        let mut initial_chunk = String::new();
175        // resolved synchronous resources and errors
176        initial_chunk.push_str("__RESOLVED_RESOURCES=[");
177        for resolved in sync_data {
178            resolved.write_to_buf(&mut initial_chunk);
179            initial_chunk.push(',');
180        }
181        initial_chunk.push_str("];");
182
183        initial_chunk.push_str("__SERIALIZED_ERRORS=[");
184        for error in mem::take(&mut *self.errors.write().or_poisoned()) {
185            _ = write!(
186                initial_chunk,
187                "[{}, {}, {:?}],",
188                error.0 .0,
189                error.1,
190                error.2.to_string()
191            );
192        }
193        initial_chunk.push_str("];");
194
195        // pending async resources
196        initial_chunk.push_str("__PENDING_RESOURCES=[");
197        for (id, _) in async_data.iter() {
198            _ = write!(&mut initial_chunk, "{},", id.0);
199        }
200        initial_chunk.push_str("];");
201
202        // resolvers
203        initial_chunk.push_str("__RESOURCE_RESOLVERS=[];");
204
205        let async_data = AsyncDataStream {
206            async_buf: Arc::clone(&self.async_buf),
207            errors: Arc::clone(&self.errors),
208            sealed_error_boundaries: Arc::clone(&self.sealed_error_boundaries),
209        };
210
211        let incomplete = Arc::clone(&self.incomplete);
212
213        let stream = stream::once(async move { initial_chunk })
214            .chain(async_data)
215            .chain(once(async move {
216                let mut script = String::new();
217                script.push_str("__INCOMPLETE_CHUNKS=[");
218                for chunk in mem::take(&mut *incomplete.lock().or_poisoned()) {
219                    _ = write!(script, "{},", chunk.0);
220                }
221                script.push_str("];");
222                script
223            }));
224        Some(Box::pin(stream))
225    }
226
227    fn during_hydration(&self) -> bool {
228        false
229    }
230
231    fn hydration_complete(&self) {}
232
233    fn defer_stream(&self, wait_for: PinnedFuture<()>) {
234        self.deferred.lock().or_poisoned().push(wait_for);
235    }
236
237    fn await_deferred(&self) -> Option<PinnedFuture<()>> {
238        let deferred = mem::take(&mut *self.deferred.lock().or_poisoned());
239        if deferred.is_empty() {
240            None
241        } else {
242            Some(Box::pin(async move {
243                join_all(deferred).await;
244            }))
245        }
246    }
247
248    fn set_incomplete_chunk(&self, id: SerializedDataId) {
249        self.incomplete.lock().or_poisoned().push(id);
250    }
251
252    fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool {
253        self.incomplete
254            .lock()
255            .or_poisoned()
256            .iter()
257            .any(|entry| entry == id)
258    }
259}
260
261struct AsyncDataStream {
262    async_buf: AsyncDataBuf,
263    errors: ErrorBuf,
264    sealed_error_boundaries: SealedErrors,
265}
266
267impl Stream for AsyncDataStream {
268    type Item = String;
269
270    fn poll_next(
271        self: Pin<&mut Self>,
272        cx: &mut Context<'_>,
273    ) -> Poll<Option<Self::Item>> {
274        let mut resolved = String::new();
275        let mut async_buf = self.async_buf.write().or_poisoned();
276        let data = mem::take(&mut *async_buf);
277        for (id, mut fut) in data {
278            match fut.as_mut().poll(cx) {
279                // if it's not ready, put it back into the queue
280                Poll::Pending => {
281                    async_buf.push((id, fut));
282                }
283                Poll::Ready(data) => {
284                    let data = data.replace('<', "\\u003c");
285                    _ = write!(
286                        resolved,
287                        "__RESOLVED_RESOURCES[{}] = {:?};",
288                        id.0, data
289                    );
290                }
291            }
292        }
293        let sealed = self.sealed_error_boundaries.read().or_poisoned();
294        for error in mem::take(&mut *self.errors.write().or_poisoned()) {
295            if !sealed.contains(&error.0) {
296                _ = write!(
297                    resolved,
298                    "__SERIALIZED_ERRORS.push([{}, {}, {:?}]);",
299                    error.0 .0,
300                    error.1,
301                    error.2.to_string()
302                );
303            }
304        }
305
306        if async_buf.is_empty() && resolved.is_empty() {
307            return Poll::Ready(None);
308        }
309        if resolved.is_empty() {
310            return Poll::Pending;
311        }
312
313        Poll::Ready(Some(resolved))
314    }
315}
316
317#[derive(Debug)]
318struct ResolvedData(SerializedDataId, String);
319
320impl ResolvedData {
321    pub fn write_to_buf(&self, buf: &mut String) {
322        let ResolvedData(id, ser) = self;
323        // escapes < to prevent it being interpreted as another opening HTML tag
324        let ser = ser.replace('<', "\\u003c");
325        write!(buf, "{}: {:?}", id.0, ser).unwrap();
326    }
327}