leptos_reactive/
suspense.rs

1//! Types that handle asynchronous data loading via `<Suspense/>`.
2
3use crate::{
4    batch, create_isomorphic_effect, create_memo, create_rw_signal,
5    create_signal, oco::Oco, queue_microtask, store_value, Memo, ReadSignal,
6    ResourceId, RwSignal, SignalSet, SignalUpdate, SignalWith, StoredValue,
7    WriteSignal,
8};
9use futures::Future;
10use rustc_hash::FxHashSet;
11use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc};
12
13/// Tracks [`Resource`](crate::Resource)s that are read under a suspense context,
14/// i.e., within a [`Suspense`](https://docs.rs/leptos_core/latest/leptos_core/fn.Suspense.html) component.
15#[derive(Copy, Clone, Debug)]
16pub struct SuspenseContext {
17    /// The number of resources that are currently pending.
18    pub pending_resources: ReadSignal<usize>,
19    set_pending_resources: WriteSignal<usize>,
20    // NOTE: For correctness reasons, we really need to move to this
21    // However, for API stability reasons, I need to keep the counter-incrementing version too
22    pub(crate) pending: RwSignal<FxHashSet<ResourceId>>,
23    pub(crate) pending_serializable_resources: RwSignal<FxHashSet<ResourceId>>,
24    pub(crate) pending_serializable_resources_count: RwSignal<usize>,
25    pub(crate) local_status: StoredValue<Option<LocalStatus>>,
26    pub(crate) should_block: StoredValue<bool>,
27}
28
29#[derive(Copy, Clone, Debug, PartialEq, Eq)]
30pub(crate) enum LocalStatus {
31    LocalOnly,
32    Mixed,
33    SerializableOnly,
34}
35
36/// A single, global suspense context that will be checked when resources
37/// are read. This won’t be “blocked” by lower suspense components. This is
38/// useful for e.g., holding route transitions.
39#[derive(Clone, Debug)]
40pub struct GlobalSuspenseContext(Rc<RefCell<SuspenseContext>>);
41
42impl GlobalSuspenseContext {
43    /// Creates an empty global suspense context.
44    pub fn new() -> Self {
45        Self(Rc::new(RefCell::new(SuspenseContext::new())))
46    }
47
48    /// Runs a function with a reference to the underlying suspense context.
49    pub fn with_inner<T>(&self, f: impl FnOnce(&SuspenseContext) -> T) -> T {
50        f(&self.0.borrow())
51    }
52
53    /// Runs a function with a reference to the underlying suspense context.
54    pub fn reset(&self) {
55        let mut inner = self.0.borrow_mut();
56        _ = std::mem::replace(&mut *inner, SuspenseContext::new());
57    }
58}
59
60impl Default for GlobalSuspenseContext {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl SuspenseContext {
67    /// Whether the suspense contains local resources at this moment,
68    /// and therefore can't be serialized
69    pub fn has_local_only(&self) -> bool {
70        matches!(self.local_status.get_value(), Some(LocalStatus::LocalOnly))
71    }
72
73    /// Whether the suspense contains any local resources at this moment.
74    pub fn has_any_local(&self) -> bool {
75        matches!(
76            self.local_status.get_value(),
77            Some(LocalStatus::LocalOnly) | Some(LocalStatus::Mixed)
78        )
79    }
80
81    /// Whether any blocking resources are read under this suspense context,
82    /// meaning the HTML stream should not begin until it has resolved.
83    pub fn should_block(&self) -> bool {
84        self.should_block.get_value()
85    }
86
87    /// Returns a `Future` that resolves when this suspense is resolved.
88    pub fn to_future(&self) -> impl Future<Output = ()> {
89        use futures::StreamExt;
90
91        let pending = self.pending;
92        let (tx, mut rx) = futures::channel::mpsc::channel(1);
93        let tx = RefCell::new(tx);
94        queue_microtask(move || {
95            create_isomorphic_effect(move |_| {
96                if pending.with(|p| p.is_empty()) {
97                    _ = tx.borrow_mut().try_send(());
98                }
99            });
100        });
101        async move {
102            rx.next().await;
103        }
104    }
105
106    /// Reactively checks whether there are no pending resources in the suspense.
107    pub fn none_pending(&self) -> bool {
108        self.pending.with(|p| p.is_empty())
109    }
110}
111
112impl std::hash::Hash for SuspenseContext {
113    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
114        self.pending.id.hash(state);
115    }
116}
117
118impl PartialEq for SuspenseContext {
119    fn eq(&self, other: &Self) -> bool {
120        self.pending.id == other.pending.id
121    }
122}
123
124impl Eq for SuspenseContext {}
125
126impl SuspenseContext {
127    /// Creates an empty suspense context.
128    pub fn new() -> Self {
129        let (pending_resources, set_pending_resources) = create_signal(0); // can be removed when possible
130        let pending_serializable_resources =
131            create_rw_signal(Default::default());
132        let pending_serializable_resources_count = create_rw_signal(0); // can be removed when possible
133        let local_status = store_value(None);
134        let should_block = store_value(false);
135        let pending = create_rw_signal(Default::default());
136        Self {
137            pending,
138            pending_resources,
139            set_pending_resources,
140            pending_serializable_resources,
141            pending_serializable_resources_count,
142            local_status,
143            should_block,
144        }
145    }
146
147    /// Notifies the suspense context that a new resource is now pending.
148    pub fn increment(&self, serializable: bool) {
149        let setter = self.set_pending_resources;
150        let serializable_resources = self.pending_serializable_resources_count;
151        let local_status = self.local_status;
152        setter.update(|n| *n += 1);
153        if serializable {
154            serializable_resources.update(|n| *n += 1);
155            local_status.update_value(|status| {
156                *status = Some(match status {
157                    None => LocalStatus::SerializableOnly,
158                    Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
159                    Some(LocalStatus::Mixed) => LocalStatus::Mixed,
160                    Some(LocalStatus::SerializableOnly) => {
161                        LocalStatus::SerializableOnly
162                    }
163                });
164            });
165        } else {
166            local_status.update_value(|status| {
167                *status = Some(match status {
168                    None => LocalStatus::LocalOnly,
169                    Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
170                    Some(LocalStatus::Mixed) => LocalStatus::Mixed,
171                    Some(LocalStatus::SerializableOnly) => LocalStatus::Mixed,
172                });
173            });
174        }
175    }
176
177    /// Notifies the suspense context that a resource has resolved.
178    pub fn decrement(&self, serializable: bool) {
179        let setter = self.set_pending_resources;
180        let serializable_resources = self.pending_serializable_resources_count;
181        setter.update(|n| {
182            if *n > 0 {
183                *n -= 1
184            }
185        });
186        if serializable {
187            serializable_resources.update(|n| {
188                if *n > 0 {
189                    *n -= 1;
190                }
191            });
192        }
193    }
194
195    /// Notifies the suspense context that a new resource is now pending.
196    pub(crate) fn increment_for_resource(
197        &self,
198        serializable: bool,
199        resource: ResourceId,
200    ) {
201        let pending = self.pending;
202        let serializable_resources = self.pending_serializable_resources;
203        let local_status = self.local_status;
204        batch(move || {
205            pending.update(|n| {
206                n.insert(resource);
207            });
208            if serializable {
209                serializable_resources.update(|n| {
210                    n.insert(resource);
211                });
212                local_status.update_value(|status| {
213                    *status = Some(match status {
214                        None => LocalStatus::SerializableOnly,
215                        Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
216                        Some(LocalStatus::Mixed) => LocalStatus::Mixed,
217                        Some(LocalStatus::SerializableOnly) => {
218                            LocalStatus::SerializableOnly
219                        }
220                    });
221                });
222            } else {
223                local_status.update_value(|status| {
224                    *status = Some(match status {
225                        None => LocalStatus::LocalOnly,
226                        Some(LocalStatus::LocalOnly) => LocalStatus::LocalOnly,
227                        Some(LocalStatus::Mixed) => LocalStatus::Mixed,
228                        Some(LocalStatus::SerializableOnly) => {
229                            LocalStatus::Mixed
230                        }
231                    });
232                });
233            }
234        });
235    }
236
237    /// Notifies the suspense context that a resource has resolved.
238    pub fn decrement_for_resource(
239        &self,
240        serializable: bool,
241        resource: ResourceId,
242    ) {
243        let setter = self.pending;
244        let serializable_resources = self.pending_serializable_resources;
245        batch(move || {
246            setter.update(|n| {
247                n.remove(&resource);
248            });
249            if serializable {
250                serializable_resources.update(|n| {
251                    n.remove(&resource);
252                });
253            }
254        });
255    }
256
257    /// Resets the counter of pending resources.
258    pub fn clear(&self) {
259        batch(move || {
260            self.set_pending_resources.set(0);
261            self.pending.update(|p| p.clear());
262            self.pending_serializable_resources.update(|p| p.clear());
263        });
264    }
265
266    /// Tests whether all of the pending resources have resolved.
267    pub fn ready(&self) -> Memo<bool> {
268        let pending = self.pending;
269        create_memo(move |_| {
270            pending.try_with(|n| n.is_empty()).unwrap_or(false)
271        })
272    }
273}
274
275impl Default for SuspenseContext {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281/// Represents a chunk in a stream of HTML.
282pub enum StreamChunk {
283    /// A chunk of synchronous HTML.
284    Sync(Oco<'static, str>),
285    /// A future that resolves to be a list of additional chunks.
286    Async {
287        /// The HTML chunks this contains.
288        chunks: Pin<Box<dyn Future<Output = VecDeque<StreamChunk>>>>,
289        /// Whether this should block the stream.
290        should_block: bool,
291    },
292}
293
294impl core::fmt::Debug for StreamChunk {
295    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
296        match self {
297            StreamChunk::Sync(data) => write!(f, "StreamChunk::Sync({data:?})"),
298            StreamChunk::Async { .. } => write!(f, "StreamChunk::Async(_)"),
299        }
300    }
301}