1use reactive_graph::{
2 computed::{
3 suspense::LocalResourceNotifier, ArcAsyncDerived, AsyncDerived,
4 AsyncDerivedFuture,
5 },
6 graph::{
7 AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
8 ToAnySource, ToAnySubscriber,
9 },
10 owner::use_context,
11 send_wrapper_ext::SendOption,
12 signal::{
13 guards::{AsyncPlain, Mapped, ReadGuard},
14 ArcRwSignal, RwSignal,
15 },
16 traits::{
17 DefinedAt, IsDisposed, ReadUntracked, Track, Update, With, Write,
18 },
19};
20use std::{
21 future::{pending, Future, IntoFuture},
22 panic::Location,
23};
24
25pub struct ArcLocalResource<T> {
27 data: ArcAsyncDerived<T>,
28 refetch: ArcRwSignal<usize>,
29 #[cfg(any(debug_assertions, leptos_debuginfo))]
30 defined_at: &'static Location<'static>,
31}
32
33impl<T> Clone for ArcLocalResource<T> {
34 fn clone(&self) -> Self {
35 Self {
36 data: self.data.clone(),
37 refetch: self.refetch.clone(),
38 #[cfg(any(debug_assertions, leptos_debuginfo))]
39 defined_at: self.defined_at,
40 }
41 }
42}
43
44impl<T> ArcLocalResource<T> {
45 #[track_caller]
50 pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
51 where
52 T: 'static,
53 Fut: Future<Output = T> + 'static,
54 {
55 let fetcher = move || {
56 let fut = fetcher();
57 async move {
58 if cfg!(feature = "ssr") {
62 pending().await
63 } else {
64 any_spawner::Executor::tick().await;
70 fut.await
71 }
72 }
73 };
74 let refetch = ArcRwSignal::new(0);
75
76 Self {
77 data: if cfg!(feature = "ssr") {
78 ArcAsyncDerived::new_mock(fetcher)
79 } else {
80 let refetch = refetch.clone();
81 ArcAsyncDerived::new_unsync(move || {
82 refetch.track();
83 fetcher()
84 })
85 },
86 refetch,
87 #[cfg(any(debug_assertions, leptos_debuginfo))]
88 defined_at: Location::caller(),
89 }
90 }
91
92 pub fn refetch(&self) {
94 *self.refetch.write() += 1;
95 }
96
97 #[track_caller]
100 pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
101 where
102 T: 'static,
103 {
104 self.data.try_with(|n| n.as_ref().map(f))?
105 }
106}
107
108impl<T, E> ArcLocalResource<Result<T, E>>
109where
110 T: 'static,
111 E: Clone + 'static,
112{
113 #[track_caller]
121 pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
122 self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
123 }
124}
125
126impl<T> IntoFuture for ArcLocalResource<T>
127where
128 T: Clone + 'static,
129{
130 type Output = T;
131 type IntoFuture = AsyncDerivedFuture<T>;
132
133 fn into_future(self) -> Self::IntoFuture {
134 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
135 notifier.notify();
136 } else if cfg!(feature = "ssr") {
137 panic!(
138 "Reading from a LocalResource outside Suspense in `ssr` mode \
139 will cause the response to hang, because LocalResources are \
140 always pending on the server."
141 );
142 }
143 self.data.into_future()
144 }
145}
146
147impl<T> DefinedAt for ArcLocalResource<T> {
148 fn defined_at(&self) -> Option<&'static Location<'static>> {
149 #[cfg(any(debug_assertions, leptos_debuginfo))]
150 {
151 Some(self.defined_at)
152 }
153 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
154 {
155 None
156 }
157 }
158}
159
160impl<T> ReadUntracked for ArcLocalResource<T>
161where
162 T: 'static,
163{
164 type Value =
165 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
166
167 fn try_read_untracked(&self) -> Option<Self::Value> {
168 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
169 notifier.notify();
170 }
171 self.data.try_read_untracked()
172 }
173}
174
175impl<T: 'static> IsDisposed for ArcLocalResource<T> {
176 #[inline(always)]
177 fn is_disposed(&self) -> bool {
178 false
179 }
180}
181
182impl<T: 'static> ToAnySource for ArcLocalResource<T> {
183 fn to_any_source(&self) -> AnySource {
184 self.data.to_any_source()
185 }
186}
187
188impl<T: 'static> ToAnySubscriber for ArcLocalResource<T> {
189 fn to_any_subscriber(&self) -> AnySubscriber {
190 self.data.to_any_subscriber()
191 }
192}
193
194impl<T> Source for ArcLocalResource<T> {
195 fn add_subscriber(&self, subscriber: AnySubscriber) {
196 self.data.add_subscriber(subscriber)
197 }
198
199 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
200 self.data.remove_subscriber(subscriber);
201 }
202
203 fn clear_subscribers(&self) {
204 self.data.clear_subscribers();
205 }
206}
207
208impl<T> ReactiveNode for ArcLocalResource<T> {
209 fn mark_dirty(&self) {
210 self.data.mark_dirty();
211 }
212
213 fn mark_check(&self) {
214 self.data.mark_check();
215 }
216
217 fn mark_subscribers_check(&self) {
218 self.data.mark_subscribers_check();
219 }
220
221 fn update_if_necessary(&self) -> bool {
222 self.data.update_if_necessary()
223 }
224}
225
226impl<T> Subscriber for ArcLocalResource<T> {
227 fn add_source(&self, source: AnySource) {
228 self.data.add_source(source);
229 }
230
231 fn clear_sources(&self, subscriber: &AnySubscriber) {
232 self.data.clear_sources(subscriber);
233 }
234}
235
236pub struct LocalResource<T> {
238 data: AsyncDerived<T>,
239 refetch: RwSignal<usize>,
240 #[cfg(any(debug_assertions, leptos_debuginfo))]
241 defined_at: &'static Location<'static>,
242}
243
244impl<T> Clone for LocalResource<T> {
245 fn clone(&self) -> Self {
246 *self
247 }
248}
249
250impl<T> Copy for LocalResource<T> {}
251
252impl<T> LocalResource<T> {
253 #[track_caller]
258 pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
259 where
260 T: 'static,
261 Fut: Future<Output = T> + 'static,
262 {
263 let fetcher = move || {
264 let fut = fetcher();
265 async move {
266 if cfg!(feature = "ssr") {
270 pending().await
271 } else {
272 any_spawner::Executor::tick().await;
278 fut.await
279 }
280 }
281 };
282 let refetch = RwSignal::new(0);
283
284 Self {
285 data: if cfg!(feature = "ssr") {
286 AsyncDerived::new_mock(fetcher)
287 } else {
288 AsyncDerived::new_unsync_threadsafe_storage(move || {
289 refetch.track();
290 fetcher()
291 })
292 },
293 refetch,
294 #[cfg(any(debug_assertions, leptos_debuginfo))]
295 defined_at: Location::caller(),
296 }
297 }
298
299 pub fn refetch(&self) {
301 self.refetch.try_update(|n| *n += 1);
302 }
303
304 #[track_caller]
307 pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
308 where
309 T: 'static,
310 {
311 self.data.try_with(|n| n.as_ref().map(f))?
312 }
313}
314
315impl<T, E> LocalResource<Result<T, E>>
316where
317 T: 'static,
318 E: Clone + 'static,
319{
320 #[track_caller]
328 pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
329 self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
330 }
331}
332
333impl<T> IntoFuture for LocalResource<T>
334where
335 T: Clone + 'static,
336{
337 type Output = T;
338 type IntoFuture = AsyncDerivedFuture<T>;
339
340 fn into_future(self) -> Self::IntoFuture {
341 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
342 notifier.notify();
343 } else if cfg!(feature = "ssr") {
344 panic!(
345 "Reading from a LocalResource outside Suspense in `ssr` mode \
346 will cause the response to hang, because LocalResources are \
347 always pending on the server."
348 );
349 }
350 self.data.into_future()
351 }
352}
353
354impl<T> DefinedAt for LocalResource<T> {
355 fn defined_at(&self) -> Option<&'static Location<'static>> {
356 #[cfg(any(debug_assertions, leptos_debuginfo))]
357 {
358 Some(self.defined_at)
359 }
360 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
361 {
362 None
363 }
364 }
365}
366
367impl<T> ReadUntracked for LocalResource<T>
368where
369 T: 'static,
370{
371 type Value =
372 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
373
374 fn try_read_untracked(&self) -> Option<Self::Value> {
375 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
376 notifier.notify();
377 }
378 self.data.try_read_untracked()
379 }
380}
381
382impl<T: 'static> IsDisposed for LocalResource<T> {
383 fn is_disposed(&self) -> bool {
384 self.data.is_disposed()
385 }
386}
387
388impl<T: 'static> ToAnySource for LocalResource<T>
389where
390 T: 'static,
391{
392 fn to_any_source(&self) -> AnySource {
393 self.data.to_any_source()
394 }
395}
396
397impl<T: 'static> ToAnySubscriber for LocalResource<T>
398where
399 T: 'static,
400{
401 fn to_any_subscriber(&self) -> AnySubscriber {
402 self.data.to_any_subscriber()
403 }
404}
405
406impl<T> Source for LocalResource<T>
407where
408 T: 'static,
409{
410 fn add_subscriber(&self, subscriber: AnySubscriber) {
411 self.data.add_subscriber(subscriber)
412 }
413
414 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
415 self.data.remove_subscriber(subscriber);
416 }
417
418 fn clear_subscribers(&self) {
419 self.data.clear_subscribers();
420 }
421}
422
423impl<T> ReactiveNode for LocalResource<T>
424where
425 T: 'static,
426{
427 fn mark_dirty(&self) {
428 self.data.mark_dirty();
429 }
430
431 fn mark_check(&self) {
432 self.data.mark_check();
433 }
434
435 fn mark_subscribers_check(&self) {
436 self.data.mark_subscribers_check();
437 }
438
439 fn update_if_necessary(&self) -> bool {
440 self.data.update_if_necessary()
441 }
442}
443
444impl<T> Subscriber for LocalResource<T>
445where
446 T: 'static,
447{
448 fn add_source(&self, source: AnySource) {
449 self.data.add_source(source);
450 }
451
452 fn clear_sources(&self, subscriber: &AnySubscriber) {
453 self.data.clear_sources(subscriber);
454 }
455}
456
457impl<T: 'static> From<ArcLocalResource<T>> for LocalResource<T> {
458 fn from(arc: ArcLocalResource<T>) -> Self {
459 Self {
460 data: arc.data.into(),
461 refetch: arc.refetch.into(),
462 #[cfg(any(debug_assertions, leptos_debuginfo))]
463 defined_at: arc.defined_at,
464 }
465 }
466}
467
468impl<T: 'static> From<LocalResource<T>> for ArcLocalResource<T> {
469 fn from(local: LocalResource<T>) -> Self {
470 Self {
471 data: local.data.into(),
472 refetch: local.refetch.into(),
473 #[cfg(any(debug_assertions, leptos_debuginfo))]
474 defined_at: local.defined_at,
475 }
476 }
477}