leptos_reactive/
suspense.rs1use crate::{
4 batch, create_isomorphic_effect, create_memo, create_rw_signal,
5 create_signal, oco::Oco, queue_microtask, store_value, Memo, ReadSignal,
6 ResourceId, RwSignal, SignalSet, SignalUpdate, SignalWith, StoredValue,
7 WriteSignal,
8};
9use futures::Future;
10use rustc_hash::FxHashSet;
11use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc};
12
13#[derive(Copy, Clone, Debug)]
16pub struct SuspenseContext {
17 pub pending_resources: ReadSignal<usize>,
19 set_pending_resources: WriteSignal<usize>,
20 pub(crate) pending: RwSignal<FxHashSet<ResourceId>>,
23 pub(crate) pending_serializable_resources: RwSignal<FxHashSet<ResourceId>>,
24 pub(crate) pending_serializable_resources_count: RwSignal<usize>,
25 pub(crate) local_status: StoredValue<Option<LocalStatus>>,
26 pub(crate) should_block: StoredValue<bool>,
27}
28
29#[derive(Copy, Clone, Debug, PartialEq, Eq)]
30pub(crate) enum LocalStatus {
31 LocalOnly,
32 Mixed,
33 SerializableOnly,
34}
35
36#[derive(Clone, Debug)]
40pub struct GlobalSuspenseContext(Rc<RefCell<SuspenseContext>>);
41
42impl GlobalSuspenseContext {
43 pub fn new() -> Self {
45 Self(Rc::new(RefCell::new(SuspenseContext::new())))
46 }
47
48 pub fn with_inner<T>(&self, f: impl FnOnce(&SuspenseContext) -> T) -> T {
50 f(&self.0.borrow())
51 }
52
53 pub fn reset(&self) {
55 let mut inner = self.0.borrow_mut();
56 _ = std::mem::replace(&mut *inner, SuspenseContext::new());
57 }
58}
59
60impl Default for GlobalSuspenseContext {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl SuspenseContext {
67 pub fn has_local_only(&self) -> bool {
70 matches!(self.local_status.get_value(), Some(LocalStatus::LocalOnly))
71 }
72
73 pub fn has_any_local(&self) -> bool {
75 matches!(
76 self.local_status.get_value(),
77 Some(LocalStatus::LocalOnly) | Some(LocalStatus::Mixed)
78 )
79 }
80
81 pub fn should_block(&self) -> bool {
84 self.should_block.get_value()
85 }
86
87 pub fn to_future(&self) -> impl Future<Output = ()> {
89 use futures::StreamExt;
90
91 let pending = self.pending;
92 let (tx, mut rx) = futures::channel::mpsc::channel(1);
93 let tx = RefCell::new(tx);
94 queue_microtask(move || {
95 create_isomorphic_effect(move |_| {
96 if pending.with(|p| p.is_empty()) {
97 _ = tx.borrow_mut().try_send(());
98 }
99 });
100 });
101 async move {
102 rx.next().await;
103 }
104 }
105
106 pub fn none_pending(&self) -> bool {
108 self.pending.with(|p| p.is_empty())
109 }
110}
111
112impl std::hash::Hash for SuspenseContext {
113 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
114 self.pending.id.hash(state);
115 }
116}
117
118impl PartialEq for SuspenseContext {
119 fn eq(&self, other: &Self) -> bool {
120 self.pending.id == other.pending.id
121 }
122}
123
124impl Eq for SuspenseContext {}
125
126impl SuspenseContext {
127 pub fn new() -> Self {
129 let (pending_resources, set_pending_resources) = create_signal(0); let pending_serializable_resources =
131 create_rw_signal(Default::default());
132 let pending_serializable_resources_count = create_rw_signal(0); let local_status = store_value(None);
134 let should_block = store_value(false);
135 let pending = create_rw_signal(Default::default());
136 Self {
137 pending,
138 pending_resources,
139 set_pending_resources,
140 pending_serializable_resources,
141 pending_serializable_resources_count,
142 local_status,
143 should_block,
144 }
145 }
146
147 pub fn increment(&self, serializable: bool) {
149 let setter = self.set_pending_resources;
150 let serializable_resources = self.pending_serializable_resources_count;
151 let local_status = self.local_status;
152 setter.update(|n| *n += 1);
153 if serializable {
154 serializable_resources.update(|n| *n += 1);
155 local_status.update_value(|status| {
156 *status = Some(match status {
157 None => LocalStatus::SerializableOnly,
158 Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
159 Some(LocalStatus::Mixed) => LocalStatus::Mixed,
160 Some(LocalStatus::SerializableOnly) => {
161 LocalStatus::SerializableOnly
162 }
163 });
164 });
165 } else {
166 local_status.update_value(|status| {
167 *status = Some(match status {
168 None => LocalStatus::LocalOnly,
169 Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
170 Some(LocalStatus::Mixed) => LocalStatus::Mixed,
171 Some(LocalStatus::SerializableOnly) => LocalStatus::Mixed,
172 });
173 });
174 }
175 }
176
177 pub fn decrement(&self, serializable: bool) {
179 let setter = self.set_pending_resources;
180 let serializable_resources = self.pending_serializable_resources_count;
181 setter.update(|n| {
182 if *n > 0 {
183 *n -= 1
184 }
185 });
186 if serializable {
187 serializable_resources.update(|n| {
188 if *n > 0 {
189 *n -= 1;
190 }
191 });
192 }
193 }
194
195 pub(crate) fn increment_for_resource(
197 &self,
198 serializable: bool,
199 resource: ResourceId,
200 ) {
201 let pending = self.pending;
202 let serializable_resources = self.pending_serializable_resources;
203 let local_status = self.local_status;
204 batch(move || {
205 pending.update(|n| {
206 n.insert(resource);
207 });
208 if serializable {
209 serializable_resources.update(|n| {
210 n.insert(resource);
211 });
212 local_status.update_value(|status| {
213 *status = Some(match status {
214 None => LocalStatus::SerializableOnly,
215 Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
216 Some(LocalStatus::Mixed) => LocalStatus::Mixed,
217 Some(LocalStatus::SerializableOnly) => {
218 LocalStatus::SerializableOnly
219 }
220 });
221 });
222 } else {
223 local_status.update_value(|status| {
224 *status = Some(match status {
225 None => LocalStatus::LocalOnly,
226 Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
227 Some(LocalStatus::Mixed) => LocalStatus::Mixed,
228 Some(LocalStatus::SerializableOnly) => {
229 LocalStatus::Mixed
230 }
231 });
232 });
233 }
234 });
235 }
236
237 pub fn decrement_for_resource(
239 &self,
240 serializable: bool,
241 resource: ResourceId,
242 ) {
243 let setter = self.pending;
244 let serializable_resources = self.pending_serializable_resources;
245 batch(move || {
246 setter.update(|n| {
247 n.remove(&resource);
248 });
249 if serializable {
250 serializable_resources.update(|n| {
251 n.remove(&resource);
252 });
253 }
254 });
255 }
256
257 pub fn clear(&self) {
259 batch(move || {
260 self.set_pending_resources.set(0);
261 self.pending.update(|p| p.clear());
262 self.pending_serializable_resources.update(|p| p.clear());
263 });
264 }
265
266 pub fn ready(&self) -> Memo<bool> {
268 let pending = self.pending;
269 create_memo(move |_| {
270 pending.try_with(|n| n.is_empty()).unwrap_or(false)
271 })
272 }
273}
274
275impl Default for SuspenseContext {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281pub enum StreamChunk {
283 Sync(Oco<'static, str>),
285 Async {
287 chunks: Pin<Box<dyn Future<Output = VecDeque<StreamChunk>>>>,
289 should_block: bool,
291 },
292}
293
294impl core::fmt::Debug for StreamChunk {
295 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
296 match self {
297 StreamChunk::Sync(data) => write!(f, "StreamChunk::Sync({data:?})"),
298 StreamChunk::Async { .. } => write!(f, "StreamChunk::Async(_)"),
299 }
300 }
301}