leptos_server/
local_resource.rs

1use reactive_graph::{
2    computed::{
3        suspense::LocalResourceNotifier, ArcAsyncDerived, AsyncDerived,
4        AsyncDerivedFuture,
5    },
6    graph::{
7        AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
8        ToAnySource, ToAnySubscriber,
9    },
10    owner::use_context,
11    send_wrapper_ext::SendOption,
12    signal::{
13        guards::{AsyncPlain, Mapped, ReadGuard},
14        ArcRwSignal, RwSignal,
15    },
16    traits::{
17        DefinedAt, IsDisposed, ReadUntracked, Track, Update, With, Write,
18    },
19};
20use std::{
21    future::{pending, Future, IntoFuture},
22    panic::Location,
23};
24
25/// A reference-counted resource that only loads its data locally on the client.
26pub struct ArcLocalResource<T> {
27    data: ArcAsyncDerived<T>,
28    refetch: ArcRwSignal<usize>,
29    #[cfg(any(debug_assertions, leptos_debuginfo))]
30    defined_at: &'static Location<'static>,
31}
32
33impl<T> Clone for ArcLocalResource<T> {
34    fn clone(&self) -> Self {
35        Self {
36            data: self.data.clone(),
37            refetch: self.refetch.clone(),
38            #[cfg(any(debug_assertions, leptos_debuginfo))]
39            defined_at: self.defined_at,
40        }
41    }
42}
43
44impl<T> ArcLocalResource<T> {
45    /// Creates the resource.
46    ///
47    /// This will only begin loading data if you are on the client (i.e., if you do not have the
48    /// `ssr` feature activated).
49    #[track_caller]
50    pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
51    where
52        T: 'static,
53        Fut: Future<Output = T> + 'static,
54    {
55        let fetcher = move || {
56            let fut = fetcher();
57            async move {
58                // in SSR mode, this will simply always be pending
59                // if we try to read from it, we will trigger Suspense automatically to fall back
60                // so this will never need to return anything
61                if cfg!(feature = "ssr") {
62                    pending().await
63                } else {
64                    // LocalResources that are immediately available can cause a hydration error,
65                    // because the future *looks* like it is alredy ready (and therefore would
66                    // already have been rendered to html on the server), but in fact was ignored
67                    // on the server. the simplest way to avoid this is to ensure that we always
68                    // wait a tick before resolving any value for a localresource.
69                    any_spawner::Executor::tick().await;
70                    fut.await
71                }
72            }
73        };
74        let refetch = ArcRwSignal::new(0);
75
76        Self {
77            data: if cfg!(feature = "ssr") {
78                ArcAsyncDerived::new_mock(fetcher)
79            } else {
80                let refetch = refetch.clone();
81                ArcAsyncDerived::new_unsync(move || {
82                    refetch.track();
83                    fetcher()
84                })
85            },
86            refetch,
87            #[cfg(any(debug_assertions, leptos_debuginfo))]
88            defined_at: Location::caller(),
89        }
90    }
91
92    /// Re-runs the async function.
93    pub fn refetch(&self) {
94        *self.refetch.write() += 1;
95    }
96
97    /// Synchronously, reactively reads the current value of the resource and applies the function
98    /// `f` to its value if it is `Some(_)`.
99    #[track_caller]
100    pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
101    where
102        T: 'static,
103    {
104        self.data.try_with(|n| n.as_ref().map(f))?
105    }
106}
107
108impl<T, E> ArcLocalResource<Result<T, E>>
109where
110    T: 'static,
111    E: Clone + 'static,
112{
113    /// Applies the given function when a resource that returns `Result<T, E>`
114    /// has resolved and loaded an `Ok(_)`, rather than requiring nested `.map()`
115    /// calls over the `Option<Result<_, _>>` returned by the resource.
116    ///
117    /// This is useful when used with features like server functions, in conjunction
118    /// with `<ErrorBoundary/>` and `<Suspense/>`, when these other components are
119    /// left to handle the `None` and `Err(_)` states.
120    #[track_caller]
121    pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
122        self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
123    }
124}
125
126impl<T> IntoFuture for ArcLocalResource<T>
127where
128    T: Clone + 'static,
129{
130    type Output = T;
131    type IntoFuture = AsyncDerivedFuture<T>;
132
133    fn into_future(self) -> Self::IntoFuture {
134        if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
135            notifier.notify();
136        } else if cfg!(feature = "ssr") {
137            panic!(
138                "Reading from a LocalResource outside Suspense in `ssr` mode \
139                 will cause the response to hang, because LocalResources are \
140                 always pending on the server."
141            );
142        }
143        self.data.into_future()
144    }
145}
146
147impl<T> DefinedAt for ArcLocalResource<T> {
148    fn defined_at(&self) -> Option<&'static Location<'static>> {
149        #[cfg(any(debug_assertions, leptos_debuginfo))]
150        {
151            Some(self.defined_at)
152        }
153        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
154        {
155            None
156        }
157    }
158}
159
160impl<T> ReadUntracked for ArcLocalResource<T>
161where
162    T: 'static,
163{
164    type Value =
165        ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
166
167    fn try_read_untracked(&self) -> Option<Self::Value> {
168        if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
169            notifier.notify();
170        }
171        self.data.try_read_untracked()
172    }
173}
174
175impl<T: 'static> IsDisposed for ArcLocalResource<T> {
176    #[inline(always)]
177    fn is_disposed(&self) -> bool {
178        false
179    }
180}
181
182impl<T: 'static> ToAnySource for ArcLocalResource<T> {
183    fn to_any_source(&self) -> AnySource {
184        self.data.to_any_source()
185    }
186}
187
188impl<T: 'static> ToAnySubscriber for ArcLocalResource<T> {
189    fn to_any_subscriber(&self) -> AnySubscriber {
190        self.data.to_any_subscriber()
191    }
192}
193
194impl<T> Source for ArcLocalResource<T> {
195    fn add_subscriber(&self, subscriber: AnySubscriber) {
196        self.data.add_subscriber(subscriber)
197    }
198
199    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
200        self.data.remove_subscriber(subscriber);
201    }
202
203    fn clear_subscribers(&self) {
204        self.data.clear_subscribers();
205    }
206}
207
208impl<T> ReactiveNode for ArcLocalResource<T> {
209    fn mark_dirty(&self) {
210        self.data.mark_dirty();
211    }
212
213    fn mark_check(&self) {
214        self.data.mark_check();
215    }
216
217    fn mark_subscribers_check(&self) {
218        self.data.mark_subscribers_check();
219    }
220
221    fn update_if_necessary(&self) -> bool {
222        self.data.update_if_necessary()
223    }
224}
225
226impl<T> Subscriber for ArcLocalResource<T> {
227    fn add_source(&self, source: AnySource) {
228        self.data.add_source(source);
229    }
230
231    fn clear_sources(&self, subscriber: &AnySubscriber) {
232        self.data.clear_sources(subscriber);
233    }
234}
235
236/// A resource that only loads its data locally on the client.
237pub struct LocalResource<T> {
238    data: AsyncDerived<T>,
239    refetch: RwSignal<usize>,
240    #[cfg(any(debug_assertions, leptos_debuginfo))]
241    defined_at: &'static Location<'static>,
242}
243
244impl<T> Clone for LocalResource<T> {
245    fn clone(&self) -> Self {
246        *self
247    }
248}
249
250impl<T> Copy for LocalResource<T> {}
251
252impl<T> LocalResource<T> {
253    /// Creates the resource.
254    ///
255    /// This will only begin loading data if you are on the client (i.e., if you do not have the
256    /// `ssr` feature activated).
257    #[track_caller]
258    pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
259    where
260        T: 'static,
261        Fut: Future<Output = T> + 'static,
262    {
263        let fetcher = move || {
264            let fut = fetcher();
265            async move {
266                // in SSR mode, this will simply always be pending
267                // if we try to read from it, we will trigger Suspense automatically to fall back
268                // so this will never need to return anything
269                if cfg!(feature = "ssr") {
270                    pending().await
271                } else {
272                    // LocalResources that are immediately available can cause a hydration error,
273                    // because the future *looks* like it is alredy ready (and therefore would
274                    // already have been rendered to html on the server), but in fact was ignored
275                    // on the server. the simplest way to avoid this is to ensure that we always
276                    // wait a tick before resolving any value for a localresource.
277                    any_spawner::Executor::tick().await;
278                    fut.await
279                }
280            }
281        };
282        let refetch = RwSignal::new(0);
283
284        Self {
285            data: if cfg!(feature = "ssr") {
286                AsyncDerived::new_mock(fetcher)
287            } else {
288                AsyncDerived::new_unsync_threadsafe_storage(move || {
289                    refetch.track();
290                    fetcher()
291                })
292            },
293            refetch,
294            #[cfg(any(debug_assertions, leptos_debuginfo))]
295            defined_at: Location::caller(),
296        }
297    }
298
299    /// Re-runs the async function.
300    pub fn refetch(&self) {
301        self.refetch.try_update(|n| *n += 1);
302    }
303
304    /// Synchronously, reactively reads the current value of the resource and applies the function
305    /// `f` to its value if it is `Some(_)`.
306    #[track_caller]
307    pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
308    where
309        T: 'static,
310    {
311        self.data.try_with(|n| n.as_ref().map(f))?
312    }
313}
314
315impl<T, E> LocalResource<Result<T, E>>
316where
317    T: 'static,
318    E: Clone + 'static,
319{
320    /// Applies the given function when a resource that returns `Result<T, E>`
321    /// has resolved and loaded an `Ok(_)`, rather than requiring nested `.map()`
322    /// calls over the `Option<Result<_, _>>` returned by the resource.
323    ///
324    /// This is useful when used with features like server functions, in conjunction
325    /// with `<ErrorBoundary/>` and `<Suspense/>`, when these other components are
326    /// left to handle the `None` and `Err(_)` states.
327    #[track_caller]
328    pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
329        self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
330    }
331}
332
333impl<T> IntoFuture for LocalResource<T>
334where
335    T: Clone + 'static,
336{
337    type Output = T;
338    type IntoFuture = AsyncDerivedFuture<T>;
339
340    fn into_future(self) -> Self::IntoFuture {
341        if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
342            notifier.notify();
343        } else if cfg!(feature = "ssr") {
344            panic!(
345                "Reading from a LocalResource outside Suspense in `ssr` mode \
346                 will cause the response to hang, because LocalResources are \
347                 always pending on the server."
348            );
349        }
350        self.data.into_future()
351    }
352}
353
354impl<T> DefinedAt for LocalResource<T> {
355    fn defined_at(&self) -> Option<&'static Location<'static>> {
356        #[cfg(any(debug_assertions, leptos_debuginfo))]
357        {
358            Some(self.defined_at)
359        }
360        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
361        {
362            None
363        }
364    }
365}
366
367impl<T> ReadUntracked for LocalResource<T>
368where
369    T: 'static,
370{
371    type Value =
372        ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
373
374    fn try_read_untracked(&self) -> Option<Self::Value> {
375        if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
376            notifier.notify();
377        }
378        self.data.try_read_untracked()
379    }
380}
381
382impl<T: 'static> IsDisposed for LocalResource<T> {
383    fn is_disposed(&self) -> bool {
384        self.data.is_disposed()
385    }
386}
387
388impl<T: 'static> ToAnySource for LocalResource<T>
389where
390    T: 'static,
391{
392    fn to_any_source(&self) -> AnySource {
393        self.data.to_any_source()
394    }
395}
396
397impl<T: 'static> ToAnySubscriber for LocalResource<T>
398where
399    T: 'static,
400{
401    fn to_any_subscriber(&self) -> AnySubscriber {
402        self.data.to_any_subscriber()
403    }
404}
405
406impl<T> Source for LocalResource<T>
407where
408    T: 'static,
409{
410    fn add_subscriber(&self, subscriber: AnySubscriber) {
411        self.data.add_subscriber(subscriber)
412    }
413
414    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
415        self.data.remove_subscriber(subscriber);
416    }
417
418    fn clear_subscribers(&self) {
419        self.data.clear_subscribers();
420    }
421}
422
423impl<T> ReactiveNode for LocalResource<T>
424where
425    T: 'static,
426{
427    fn mark_dirty(&self) {
428        self.data.mark_dirty();
429    }
430
431    fn mark_check(&self) {
432        self.data.mark_check();
433    }
434
435    fn mark_subscribers_check(&self) {
436        self.data.mark_subscribers_check();
437    }
438
439    fn update_if_necessary(&self) -> bool {
440        self.data.update_if_necessary()
441    }
442}
443
444impl<T> Subscriber for LocalResource<T>
445where
446    T: 'static,
447{
448    fn add_source(&self, source: AnySource) {
449        self.data.add_source(source);
450    }
451
452    fn clear_sources(&self, subscriber: &AnySubscriber) {
453        self.data.clear_sources(subscriber);
454    }
455}
456
457impl<T: 'static> From<ArcLocalResource<T>> for LocalResource<T> {
458    fn from(arc: ArcLocalResource<T>) -> Self {
459        Self {
460            data: arc.data.into(),
461            refetch: arc.refetch.into(),
462            #[cfg(any(debug_assertions, leptos_debuginfo))]
463            defined_at: arc.defined_at,
464        }
465    }
466}
467
468impl<T: 'static> From<LocalResource<T>> for ArcLocalResource<T> {
469    fn from(local: LocalResource<T>) -> Self {
470        Self {
471            data: local.data.into(),
472            refetch: local.refetch.into(),
473            #[cfg(any(debug_assertions, leptos_debuginfo))]
474            defined_at: local.defined_at,
475        }
476    }
477}