1use reactive_graph::{
2 computed::{
3 suspense::LocalResourceNotifier, ArcAsyncDerived, AsyncDerived,
4 AsyncDerivedFuture,
5 },
6 graph::{
7 AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
8 ToAnySource, ToAnySubscriber,
9 },
10 owner::use_context,
11 send_wrapper_ext::SendOption,
12 signal::{
13 guards::{AsyncPlain, Mapped, ReadGuard},
14 ArcRwSignal, RwSignal,
15 },
16 traits::{
17 DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
18 Update, With, Write,
19 },
20};
21use std::{
22 future::{pending, Future, IntoFuture},
23 ops::{Deref, DerefMut},
24 panic::Location,
25};
26
27pub struct ArcLocalResource<T> {
29 data: ArcAsyncDerived<T>,
30 refetch: ArcRwSignal<usize>,
31 #[cfg(any(debug_assertions, leptos_debuginfo))]
32 defined_at: &'static Location<'static>,
33}
34
35impl<T> Clone for ArcLocalResource<T> {
36 fn clone(&self) -> Self {
37 Self {
38 data: self.data.clone(),
39 refetch: self.refetch.clone(),
40 #[cfg(any(debug_assertions, leptos_debuginfo))]
41 defined_at: self.defined_at,
42 }
43 }
44}
45
46impl<T> Deref for ArcLocalResource<T> {
47 type Target = ArcAsyncDerived<T>;
48
49 fn deref(&self) -> &Self::Target {
50 &self.data
51 }
52}
53
54impl<T> ArcLocalResource<T> {
55 #[track_caller]
60 pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
61 where
62 T: 'static,
63 Fut: Future<Output = T> + 'static,
64 {
65 let fetcher = move || {
66 let fut = fetcher();
67 async move {
68 if cfg!(feature = "ssr") {
72 pending().await
73 } else {
74 any_spawner::Executor::tick().await;
80 fut.await
81 }
82 }
83 };
84 let refetch = ArcRwSignal::new(0);
85
86 Self {
87 data: if cfg!(feature = "ssr") {
88 ArcAsyncDerived::new_mock(fetcher)
89 } else {
90 let refetch = refetch.clone();
91 ArcAsyncDerived::new_unsync(move || {
92 refetch.track();
93 fetcher()
94 })
95 },
96 refetch,
97 #[cfg(any(debug_assertions, leptos_debuginfo))]
98 defined_at: Location::caller(),
99 }
100 }
101
102 pub fn refetch(&self) {
104 *self.refetch.write() += 1;
105 }
106
107 #[track_caller]
110 pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
111 where
112 T: 'static,
113 {
114 self.data.try_with(|n| n.as_ref().map(f))?
115 }
116}
117
118impl<T, E> ArcLocalResource<Result<T, E>>
119where
120 T: 'static,
121 E: Clone + 'static,
122{
123 #[track_caller]
131 pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
132 self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
133 }
134}
135
136impl<T> IntoFuture for ArcLocalResource<T>
137where
138 T: Clone + 'static,
139{
140 type Output = T;
141 type IntoFuture = AsyncDerivedFuture<T>;
142
143 fn into_future(self) -> Self::IntoFuture {
144 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
145 notifier.notify();
146 } else if cfg!(feature = "ssr") {
147 panic!(
148 "Reading from a LocalResource outside Suspense in `ssr` mode \
149 will cause the response to hang, because LocalResources are \
150 always pending on the server."
151 );
152 }
153 self.data.into_future()
154 }
155}
156
157impl<T> DefinedAt for ArcLocalResource<T> {
158 fn defined_at(&self) -> Option<&'static Location<'static>> {
159 #[cfg(any(debug_assertions, leptos_debuginfo))]
160 {
161 Some(self.defined_at)
162 }
163 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
164 {
165 None
166 }
167 }
168}
169
170impl<T> Notify for ArcLocalResource<T>
171where
172 T: 'static,
173{
174 fn notify(&self) {
175 self.data.notify()
176 }
177}
178
179impl<T> Write for ArcLocalResource<T>
180where
181 T: 'static,
182{
183 type Value = Option<T>;
184
185 fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
186 self.data.try_write()
187 }
188
189 fn try_write_untracked(
190 &self,
191 ) -> Option<impl DerefMut<Target = Self::Value>> {
192 self.data.try_write_untracked()
193 }
194}
195
196impl<T> ReadUntracked for ArcLocalResource<T>
197where
198 T: 'static,
199{
200 type Value =
201 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
202
203 fn try_read_untracked(&self) -> Option<Self::Value> {
204 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
205 notifier.notify();
206 }
207 self.data.try_read_untracked()
208 }
209}
210
211impl<T: 'static> IsDisposed for ArcLocalResource<T> {
212 #[inline(always)]
213 fn is_disposed(&self) -> bool {
214 false
215 }
216}
217
218impl<T: 'static> ToAnySource for ArcLocalResource<T> {
219 fn to_any_source(&self) -> AnySource {
220 self.data.to_any_source()
221 }
222}
223
224impl<T: 'static> ToAnySubscriber for ArcLocalResource<T> {
225 fn to_any_subscriber(&self) -> AnySubscriber {
226 self.data.to_any_subscriber()
227 }
228}
229
230impl<T> Source for ArcLocalResource<T> {
231 fn add_subscriber(&self, subscriber: AnySubscriber) {
232 self.data.add_subscriber(subscriber)
233 }
234
235 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
236 self.data.remove_subscriber(subscriber);
237 }
238
239 fn clear_subscribers(&self) {
240 self.data.clear_subscribers();
241 }
242}
243
244impl<T> ReactiveNode for ArcLocalResource<T> {
245 fn mark_dirty(&self) {
246 self.data.mark_dirty();
247 }
248
249 fn mark_check(&self) {
250 self.data.mark_check();
251 }
252
253 fn mark_subscribers_check(&self) {
254 self.data.mark_subscribers_check();
255 }
256
257 fn update_if_necessary(&self) -> bool {
258 self.data.update_if_necessary()
259 }
260}
261
262impl<T> Subscriber for ArcLocalResource<T> {
263 fn add_source(&self, source: AnySource) {
264 self.data.add_source(source);
265 }
266
267 fn clear_sources(&self, subscriber: &AnySubscriber) {
268 self.data.clear_sources(subscriber);
269 }
270}
271
272pub struct LocalResource<T> {
274 data: AsyncDerived<T>,
275 refetch: RwSignal<usize>,
276 #[cfg(any(debug_assertions, leptos_debuginfo))]
277 defined_at: &'static Location<'static>,
278}
279
280impl<T> Deref for LocalResource<T> {
281 type Target = AsyncDerived<T>;
282
283 fn deref(&self) -> &Self::Target {
284 &self.data
285 }
286}
287
288impl<T> Clone for LocalResource<T> {
289 fn clone(&self) -> Self {
290 *self
291 }
292}
293
294impl<T> Copy for LocalResource<T> {}
295
296impl<T> LocalResource<T> {
297 #[track_caller]
302 pub fn new<Fut>(fetcher: impl Fn() -> Fut + 'static) -> Self
303 where
304 T: 'static,
305 Fut: Future<Output = T> + 'static,
306 {
307 let fetcher = move || {
308 let fut = fetcher();
309 async move {
310 if cfg!(feature = "ssr") {
314 pending().await
315 } else {
316 any_spawner::Executor::tick().await;
322 fut.await
323 }
324 }
325 };
326 let refetch = RwSignal::new(0);
327
328 Self {
329 data: if cfg!(feature = "ssr") {
330 AsyncDerived::new_mock(fetcher)
331 } else {
332 AsyncDerived::new_unsync_threadsafe_storage(move || {
333 refetch.track();
334 fetcher()
335 })
336 },
337 refetch,
338 #[cfg(any(debug_assertions, leptos_debuginfo))]
339 defined_at: Location::caller(),
340 }
341 }
342
343 pub fn refetch(&self) {
345 self.refetch.try_update(|n| *n += 1);
346 }
347
348 #[track_caller]
351 pub fn map<U>(&self, f: impl FnOnce(&T) -> U) -> Option<U>
352 where
353 T: 'static,
354 {
355 self.data.try_with(|n| n.as_ref().map(f))?
356 }
357}
358
359impl<T, E> LocalResource<Result<T, E>>
360where
361 T: 'static,
362 E: Clone + 'static,
363{
364 #[track_caller]
372 pub fn and_then<U>(&self, f: impl FnOnce(&T) -> U) -> Option<Result<U, E>> {
373 self.map(|data| data.as_ref().map(f).map_err(|e| e.clone()))
374 }
375}
376
377impl<T> IntoFuture for LocalResource<T>
378where
379 T: Clone + 'static,
380{
381 type Output = T;
382 type IntoFuture = AsyncDerivedFuture<T>;
383
384 fn into_future(self) -> Self::IntoFuture {
385 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
386 notifier.notify();
387 } else if cfg!(feature = "ssr") {
388 panic!(
389 "Reading from a LocalResource outside Suspense in `ssr` mode \
390 will cause the response to hang, because LocalResources are \
391 always pending on the server."
392 );
393 }
394 self.data.into_future()
395 }
396}
397
398impl<T> DefinedAt for LocalResource<T> {
399 fn defined_at(&self) -> Option<&'static Location<'static>> {
400 #[cfg(any(debug_assertions, leptos_debuginfo))]
401 {
402 Some(self.defined_at)
403 }
404 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
405 {
406 None
407 }
408 }
409}
410
411impl<T> Notify for LocalResource<T>
412where
413 T: 'static,
414{
415 fn notify(&self) {
416 self.data.notify()
417 }
418}
419
420impl<T> Write for LocalResource<T>
421where
422 T: 'static,
423{
424 type Value = Option<T>;
425
426 fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
427 self.data.try_write()
428 }
429
430 fn try_write_untracked(
431 &self,
432 ) -> Option<impl DerefMut<Target = Self::Value>> {
433 self.data.try_write_untracked()
434 }
435}
436
437impl<T> ReadUntracked for LocalResource<T>
438where
439 T: 'static,
440{
441 type Value =
442 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
443
444 fn try_read_untracked(&self) -> Option<Self::Value> {
445 if let Some(mut notifier) = use_context::<LocalResourceNotifier>() {
446 notifier.notify();
447 }
448 self.data.try_read_untracked()
449 }
450}
451
452impl<T: 'static> IsDisposed for LocalResource<T> {
453 fn is_disposed(&self) -> bool {
454 self.data.is_disposed()
455 }
456}
457
458impl<T: 'static> ToAnySource for LocalResource<T>
459where
460 T: 'static,
461{
462 fn to_any_source(&self) -> AnySource {
463 self.data.to_any_source()
464 }
465}
466
467impl<T: 'static> ToAnySubscriber for LocalResource<T>
468where
469 T: 'static,
470{
471 fn to_any_subscriber(&self) -> AnySubscriber {
472 self.data.to_any_subscriber()
473 }
474}
475
476impl<T> Source for LocalResource<T>
477where
478 T: 'static,
479{
480 fn add_subscriber(&self, subscriber: AnySubscriber) {
481 self.data.add_subscriber(subscriber)
482 }
483
484 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
485 self.data.remove_subscriber(subscriber);
486 }
487
488 fn clear_subscribers(&self) {
489 self.data.clear_subscribers();
490 }
491}
492
493impl<T> ReactiveNode for LocalResource<T>
494where
495 T: 'static,
496{
497 fn mark_dirty(&self) {
498 self.data.mark_dirty();
499 }
500
501 fn mark_check(&self) {
502 self.data.mark_check();
503 }
504
505 fn mark_subscribers_check(&self) {
506 self.data.mark_subscribers_check();
507 }
508
509 fn update_if_necessary(&self) -> bool {
510 self.data.update_if_necessary()
511 }
512}
513
514impl<T> Subscriber for LocalResource<T>
515where
516 T: 'static,
517{
518 fn add_source(&self, source: AnySource) {
519 self.data.add_source(source);
520 }
521
522 fn clear_sources(&self, subscriber: &AnySubscriber) {
523 self.data.clear_sources(subscriber);
524 }
525}
526
527impl<T: 'static> From<ArcLocalResource<T>> for LocalResource<T> {
528 fn from(arc: ArcLocalResource<T>) -> Self {
529 Self {
530 data: arc.data.into(),
531 refetch: arc.refetch.into(),
532 #[cfg(any(debug_assertions, leptos_debuginfo))]
533 defined_at: arc.defined_at,
534 }
535 }
536}
537
538impl<T: 'static> From<LocalResource<T>> for ArcLocalResource<T> {
539 fn from(local: LocalResource<T>) -> Self {
540 Self {
541 data: local.data.into(),
542 refetch: local.refetch.into(),
543 #[cfg(any(debug_assertions, leptos_debuginfo))]
544 defined_at: local.defined_at,
545 }
546 }
547}