1use super::{SerializedDataId, SharedContext};
2use crate::{PinnedFuture, PinnedStream};
3use futures::{
4 future::join_all,
5 stream::{self, once},
6 Stream, StreamExt,
7};
8use or_poisoned::OrPoisoned;
9use std::{
10 collections::HashSet,
11 fmt::{Debug, Write},
12 mem,
13 pin::Pin,
14 sync::{
15 atomic::{AtomicBool, AtomicUsize, Ordering},
16 Arc, Mutex, RwLock,
17 },
18 task::{Context, Poll},
19};
20use throw_error::{Error, ErrorId};
21
22type AsyncDataBuf = Arc<RwLock<Vec<(SerializedDataId, PinnedFuture<String>)>>>;
23type ErrorBuf = Arc<RwLock<Vec<(SerializedDataId, ErrorId, Error)>>>;
24type SealedErrors = Arc<RwLock<HashSet<SerializedDataId>>>;
25
26#[derive(Default)]
27pub struct SsrSharedContext {
29 id: AtomicUsize,
30 non_hydration_id: AtomicUsize,
31 is_hydrating: AtomicBool,
32 sync_buf: RwLock<Vec<ResolvedData>>,
33 async_buf: AsyncDataBuf,
34 errors: ErrorBuf,
35 sealed_error_boundaries: SealedErrors,
36 deferred: Mutex<Vec<PinnedFuture<()>>>,
37 incomplete: Arc<Mutex<Vec<SerializedDataId>>>,
38}
39
40impl SsrSharedContext {
41 pub fn new() -> Self {
43 Self {
44 is_hydrating: AtomicBool::new(true),
45 non_hydration_id: AtomicUsize::new(usize::MAX),
46 ..Default::default()
47 }
48 }
49
50 pub fn new_islands() -> Self {
55 Self {
56 is_hydrating: AtomicBool::new(false),
57 non_hydration_id: AtomicUsize::new(usize::MAX),
58 ..Default::default()
59 }
60 }
61
62 pub async fn consume_buffers(&self) -> Vec<(SerializedDataId, String)> {
69 let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
70 let async_data = mem::take(&mut *self.async_buf.write().or_poisoned());
71
72 let mut all_data = Vec::new();
73 for resolved in sync_data {
74 all_data.push((resolved.0, resolved.1));
75 }
76 for (id, fut) in async_data {
77 let data = fut.await;
78 all_data.push((id, data));
79 }
80 all_data
81 }
82}
83
84impl Debug for SsrSharedContext {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("SsrSharedContext")
87 .field("id", &self.id)
88 .field("is_hydrating", &self.is_hydrating)
89 .field("sync_buf", &self.sync_buf)
90 .field("async_buf", &self.async_buf.read().or_poisoned().len())
91 .finish()
92 }
93}
94
95impl SharedContext for SsrSharedContext {
96 fn is_browser(&self) -> bool {
97 false
98 }
99
100 #[track_caller]
101 fn next_id(&self) -> SerializedDataId {
102 let id = if self.get_is_hydrating() {
103 self.id.fetch_add(1, Ordering::Relaxed)
104 } else {
105 self.non_hydration_id.fetch_sub(1, Ordering::Relaxed)
106 };
107 SerializedDataId(id)
108 }
109
110 fn write_async(&self, id: SerializedDataId, fut: PinnedFuture<String>) {
111 self.async_buf.write().or_poisoned().push((id, fut))
112 }
113
114 fn read_data(&self, _id: &SerializedDataId) -> Option<String> {
115 None
116 }
117
118 fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
119 None
120 }
121
122 fn get_is_hydrating(&self) -> bool {
123 self.is_hydrating.load(Ordering::SeqCst)
124 }
125
126 fn set_is_hydrating(&self, is_hydrating: bool) {
127 self.is_hydrating.store(is_hydrating, Ordering::SeqCst)
128 }
129
130 fn errors(&self, boundary_id: &SerializedDataId) -> Vec<(ErrorId, Error)> {
131 self.errors
132 .read()
133 .or_poisoned()
134 .iter()
135 .filter_map(|(boundary, id, error)| {
136 if boundary == boundary_id {
137 Some((id.clone(), error.clone()))
138 } else {
139 None
140 }
141 })
142 .collect()
143 }
144
145 fn register_error(
146 &self,
147 error_boundary_id: SerializedDataId,
148 error_id: ErrorId,
149 error: Error,
150 ) {
151 self.errors.write().or_poisoned().push((
152 error_boundary_id,
153 error_id,
154 error,
155 ));
156 }
157
158 fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, Error)> {
159 mem::take(&mut *self.errors.write().or_poisoned())
160 }
161
162 fn seal_errors(&self, boundary_id: &SerializedDataId) {
163 self.sealed_error_boundaries
164 .write()
165 .or_poisoned()
166 .insert(boundary_id.clone());
167 }
168
169 fn pending_data(&self) -> Option<PinnedStream<String>> {
170 let sync_data = mem::take(&mut *self.sync_buf.write().or_poisoned());
171 let async_data = self.async_buf.read().or_poisoned();
172
173 let mut initial_chunk = String::new();
175 initial_chunk.push_str("__RESOLVED_RESOURCES=[");
177 for resolved in sync_data {
178 resolved.write_to_buf(&mut initial_chunk);
179 initial_chunk.push(',');
180 }
181 initial_chunk.push_str("];");
182
183 initial_chunk.push_str("__SERIALIZED_ERRORS=[");
184 for error in mem::take(&mut *self.errors.write().or_poisoned()) {
185 _ = write!(
186 initial_chunk,
187 "[{}, {}, {:?}],",
188 error.0 .0,
189 error.1,
190 error.2.to_string()
191 );
192 }
193 initial_chunk.push_str("];");
194
195 initial_chunk.push_str("__PENDING_RESOURCES=[");
197 for (id, _) in async_data.iter() {
198 _ = write!(&mut initial_chunk, "{},", id.0);
199 }
200 initial_chunk.push_str("];");
201
202 initial_chunk.push_str("__RESOURCE_RESOLVERS=[];");
204
205 let async_data = AsyncDataStream {
206 async_buf: Arc::clone(&self.async_buf),
207 errors: Arc::clone(&self.errors),
208 sealed_error_boundaries: Arc::clone(&self.sealed_error_boundaries),
209 };
210
211 let incomplete = Arc::clone(&self.incomplete);
212
213 let stream = stream::once(async move { initial_chunk })
214 .chain(async_data)
215 .chain(once(async move {
216 let mut script = String::new();
217 script.push_str("__INCOMPLETE_CHUNKS=[");
218 for chunk in mem::take(&mut *incomplete.lock().or_poisoned()) {
219 _ = write!(script, "{},", chunk.0);
220 }
221 script.push_str("];");
222 script
223 }));
224 Some(Box::pin(stream))
225 }
226
227 fn during_hydration(&self) -> bool {
228 false
229 }
230
231 fn hydration_complete(&self) {}
232
233 fn defer_stream(&self, wait_for: PinnedFuture<()>) {
234 self.deferred.lock().or_poisoned().push(wait_for);
235 }
236
237 fn await_deferred(&self) -> Option<PinnedFuture<()>> {
238 let deferred = mem::take(&mut *self.deferred.lock().or_poisoned());
239 if deferred.is_empty() {
240 None
241 } else {
242 Some(Box::pin(async move {
243 join_all(deferred).await;
244 }))
245 }
246 }
247
248 fn set_incomplete_chunk(&self, id: SerializedDataId) {
249 self.incomplete.lock().or_poisoned().push(id);
250 }
251
252 fn get_incomplete_chunk(&self, id: &SerializedDataId) -> bool {
253 self.incomplete
254 .lock()
255 .or_poisoned()
256 .iter()
257 .any(|entry| entry == id)
258 }
259}
260
261struct AsyncDataStream {
262 async_buf: AsyncDataBuf,
263 errors: ErrorBuf,
264 sealed_error_boundaries: SealedErrors,
265}
266
267impl Stream for AsyncDataStream {
268 type Item = String;
269
270 fn poll_next(
271 self: Pin<&mut Self>,
272 cx: &mut Context<'_>,
273 ) -> Poll<Option<Self::Item>> {
274 let mut resolved = String::new();
275 let mut async_buf = self.async_buf.write().or_poisoned();
276 let data = mem::take(&mut *async_buf);
277 for (id, mut fut) in data {
278 match fut.as_mut().poll(cx) {
279 Poll::Pending => {
281 async_buf.push((id, fut));
282 }
283 Poll::Ready(data) => {
284 let data = data.replace('<', "\\u003c");
285 _ = write!(
286 resolved,
287 "__RESOLVED_RESOURCES[{}] = {:?};",
288 id.0, data
289 );
290 }
291 }
292 }
293 let sealed = self.sealed_error_boundaries.read().or_poisoned();
294 for error in mem::take(&mut *self.errors.write().or_poisoned()) {
295 if !sealed.contains(&error.0) {
296 _ = write!(
297 resolved,
298 "__SERIALIZED_ERRORS.push([{}, {}, {:?}]);",
299 error.0 .0,
300 error.1,
301 error.2.to_string()
302 );
303 }
304 }
305
306 if async_buf.is_empty() && resolved.is_empty() {
307 return Poll::Ready(None);
308 }
309 if resolved.is_empty() {
310 return Poll::Pending;
311 }
312
313 Poll::Ready(Some(resolved))
314 }
315}
316
317#[derive(Debug)]
318struct ResolvedData(SerializedDataId, String);
319
320impl ResolvedData {
321 pub fn write_to_buf(&self, buf: &mut String) {
322 let ResolvedData(id, ser) = self;
323 let ser = ser.replace('<', "\\u003c");
325 write!(buf, "{}: {:?}", id.0, ser).unwrap();
326 }
327}