reactive_graph/computed/async_derived/
arc_async_derived.rs

1use super::{
2    inner::{ArcAsyncDerivedInner, AsyncDerivedState},
3    AsyncDerivedReadyFuture, ScopedFuture,
4};
5#[cfg(feature = "sandboxed-arenas")]
6use crate::owner::Sandboxed;
7use crate::{
8    channel::channel,
9    computed::suspense::SuspenseContext,
10    diagnostics::SpecialNonReactiveFuture,
11    graph::{
12        AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
13        SubscriberSet, ToAnySource, ToAnySubscriber, WithObserver,
14    },
15    owner::{use_context, Owner},
16    send_wrapper_ext::SendOption,
17    signal::{
18        guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
19        ArcTrigger,
20    },
21    traits::{
22        DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
23        Write,
24    },
25    transition::AsyncTransition,
26};
27use async_lock::RwLock as AsyncRwLock;
28use core::fmt::Debug;
29use futures::{channel::oneshot, FutureExt, StreamExt};
30use or_poisoned::OrPoisoned;
31use std::{
32    future::Future,
33    mem,
34    ops::{Deref, DerefMut},
35    panic::Location,
36    sync::{
37        atomic::{AtomicBool, Ordering},
38        Arc, RwLock, Weak,
39    },
40    task::Waker,
41};
42
43/// A reactive value that is derived by running an asynchronous computation in response to changes
44/// in its sources.
45///
46/// When one of its dependencies changes, this will re-run its async computation, then notify other
47/// values that depend on it that it has changed.
48///
49/// This is a reference-counted type, which is `Clone` but not `Copy`.
50/// For arena-allocated `Copy` memos, use [`AsyncDerived`](super::AsyncDerived).
51///
52/// ## Examples
53/// ```rust
54/// # use reactive_graph::computed::*;
55/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
56/// # use reactive_graph::prelude::*;
57/// # tokio_test::block_on(async move {
58/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
59/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
60///
61/// let signal1 = RwSignal::new(0);
62/// let signal2 = RwSignal::new(0);
63/// let derived = ArcAsyncDerived::new(move || async move {
64///   // reactive values can be tracked anywhere in the `async` block
65///   let value1 = signal1.get();
66///   tokio::time::sleep(std::time::Duration::from_millis(25)).await;
67///   let value2 = signal2.get();
68///
69///   value1 + value2
70/// });
71///
72/// // the value can be accessed synchronously as `Option<T>`
73/// assert_eq!(derived.get(), None);
74/// // we can also .await the value, i.e., convert it into a Future
75/// assert_eq!(derived.clone().await, 0);
76/// assert_eq!(derived.get(), Some(0));
77///
78/// signal1.set(1);
79/// // while the new value is still pending, the signal holds the old value
80/// tokio::time::sleep(std::time::Duration::from_millis(5)).await;
81/// assert_eq!(derived.get(), Some(0));
82///
83/// // setting multiple dependencies will hold until the latest change is ready
84/// signal2.set(1);
85/// assert_eq!(derived.await, 2);
86/// # });
87/// ```
88///
89/// ## Core Trait Implementations
90/// - [`.get()`](crate::traits::Get) clones the current value as an `Option<T>`.
91///   If you call it within an effect, it will cause that effect to subscribe
92///   to the memo, and to re-run whenever the value of the memo changes.
93///   - [`.get_untracked()`](crate::traits::GetUntracked) clones the value of
94///     without reactively tracking it.
95/// - [`.read()`](crate::traits::Read) returns a guard that allows accessing the
96///   value by reference. If you call it within an effect, it will
97///   cause that effect to subscribe to the memo, and to re-run whenever the
98///   value changes.
99///   - [`.read_untracked()`](crate::traits::ReadUntracked) gives access to the
100///     current value without reactively tracking it.
101/// - [`.with()`](crate::traits::With) allows you to reactively access the
102///   value without cloning by applying a callback function.
103///   - [`.with_untracked()`](crate::traits::WithUntracked) allows you to access
104///     the value by applying a callback function without reactively
105///     tracking it.
106/// - [`IntoFuture`](std::future::Future) allows you to create a [`Future`] that resolves
107///   when this resource is done loading.
108pub struct ArcAsyncDerived<T> {
109    #[cfg(any(debug_assertions, leptos_debuginfo))]
110    pub(crate) defined_at: &'static Location<'static>,
111    // the current state of this signal
112    pub(crate) value: Arc<AsyncRwLock<SendOption<T>>>,
113    // holds wakers generated when you .await this
114    pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
115    pub(crate) inner: Arc<RwLock<ArcAsyncDerivedInner>>,
116    pub(crate) loading: Arc<AtomicBool>,
117}
118
119#[allow(dead_code)]
120pub(crate) trait BlockingLock<T> {
121    fn blocking_read_arc(self: &Arc<Self>)
122        -> async_lock::RwLockReadGuardArc<T>;
123
124    fn blocking_write_arc(
125        self: &Arc<Self>,
126    ) -> async_lock::RwLockWriteGuardArc<T>;
127
128    fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>;
129
130    fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>;
131}
132
133impl<T> BlockingLock<T> for AsyncRwLock<T> {
134    fn blocking_read_arc(
135        self: &Arc<Self>,
136    ) -> async_lock::RwLockReadGuardArc<T> {
137        #[cfg(not(target_family = "wasm"))]
138        {
139            self.read_arc_blocking()
140        }
141        #[cfg(target_family = "wasm")]
142        {
143            self.read_arc().now_or_never().unwrap()
144        }
145    }
146
147    fn blocking_write_arc(
148        self: &Arc<Self>,
149    ) -> async_lock::RwLockWriteGuardArc<T> {
150        #[cfg(not(target_family = "wasm"))]
151        {
152            self.write_arc_blocking()
153        }
154        #[cfg(target_family = "wasm")]
155        {
156            self.write_arc().now_or_never().unwrap()
157        }
158    }
159
160    fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> {
161        #[cfg(not(target_family = "wasm"))]
162        {
163            self.read_blocking()
164        }
165        #[cfg(target_family = "wasm")]
166        {
167            self.read().now_or_never().unwrap()
168        }
169    }
170
171    fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> {
172        #[cfg(not(target_family = "wasm"))]
173        {
174            self.write_blocking()
175        }
176        #[cfg(target_family = "wasm")]
177        {
178            self.write().now_or_never().unwrap()
179        }
180    }
181}
182
183impl<T> Clone for ArcAsyncDerived<T> {
184    fn clone(&self) -> Self {
185        Self {
186            #[cfg(any(debug_assertions, leptos_debuginfo))]
187            defined_at: self.defined_at,
188            value: Arc::clone(&self.value),
189            wakers: Arc::clone(&self.wakers),
190            inner: Arc::clone(&self.inner),
191            loading: Arc::clone(&self.loading),
192        }
193    }
194}
195
196impl<T> Debug for ArcAsyncDerived<T> {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        let mut f = f.debug_struct("ArcAsyncDerived");
199        #[cfg(any(debug_assertions, leptos_debuginfo))]
200        f.field("defined_at", &self.defined_at);
201        f.finish_non_exhaustive()
202    }
203}
204
205impl<T> DefinedAt for ArcAsyncDerived<T> {
206    #[inline(always)]
207    fn defined_at(&self) -> Option<&'static Location<'static>> {
208        #[cfg(any(debug_assertions, leptos_debuginfo))]
209        {
210            Some(self.defined_at)
211        }
212        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
213        {
214            None
215        }
216    }
217}
218
219// This helps create a derived async signal.
220// It needs to be implemented as a macro because it needs to be flexible over
221// whether `fun` returns a `Future` that is `Send`. Doing it as a function would,
222// as far as I can tell, require repeating most of the function body.
223macro_rules! spawn_derived {
224    ($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal, $force_spawn:literal, $should_track:literal, $source:expr) => {{
225        let (notifier, mut rx) = channel();
226
227        let is_ready = $initial.is_some() && !$force_spawn;
228
229        let owner = Owner::new();
230        let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
231            owner: owner.clone(),
232            notifier,
233            sources: SourceSet::new(),
234            subscribers: SubscriberSet::new(),
235            state: AsyncDerivedState::Clean,
236            version: 0,
237            suspenses: Vec::new(),
238            pending_suspenses: Vec::new()
239        }));
240        let value = Arc::new(AsyncRwLock::new($initial));
241        let wakers = Arc::new(RwLock::new(Vec::new()));
242
243        let this = ArcAsyncDerived {
244            #[cfg(any(debug_assertions, leptos_debuginfo))]
245            defined_at: Location::caller(),
246            value: Arc::clone(&value),
247            wakers,
248            inner: Arc::clone(&inner),
249            loading: Arc::new(AtomicBool::new(!is_ready)),
250        };
251        let any_subscriber = this.to_any_subscriber();
252        let initial_fut = if $should_track {
253            owner.with_cleanup(|| {
254                any_subscriber
255                    .with_observer(|| ScopedFuture::new($fun()))
256            })
257        } else {
258            owner.with_cleanup(|| {
259                any_subscriber
260                    .with_observer_untracked(|| ScopedFuture::new($fun()))
261            })
262        };
263        #[cfg(feature = "sandboxed-arenas")]
264        let initial_fut = Sandboxed::new(initial_fut);
265        let mut initial_fut = Box::pin(initial_fut);
266
267        let (was_ready, mut initial_fut) = {
268            if is_ready {
269                (true, None)
270            } else {
271                // if we don't already know that it's ready, we need to poll once, initially
272                // so that the correct value is set synchronously
273                let initial = initial_fut.as_mut().now_or_never();
274                match initial {
275                    None => {
276                        inner.write().or_poisoned().notifier.notify();
277                        (false, Some(initial_fut))
278                    }
279                    Some(orig_value) => {
280                        let mut guard = this.inner.write().or_poisoned();
281
282                        guard.state = AsyncDerivedState::Clean;
283                        *value.blocking_write() = orig_value;
284                        this.loading.store(false, Ordering::Relaxed);
285                        (true, None)
286                    }
287                }
288            }
289        };
290
291        let mut first_run = {
292            let (ready_tx, ready_rx) = oneshot::channel();
293            if !was_ready {
294                AsyncTransition::register(ready_rx);
295            }
296            Some(ready_tx)
297        };
298
299        if was_ready {
300            first_run.take();
301        }
302
303        if let Some(source) = $source {
304            any_subscriber.with_observer(|| source.track());
305        }
306
307        if $should_spawn {
308            $spawner({
309                let value = Arc::downgrade(&this.value);
310                let inner = Arc::downgrade(&this.inner);
311                let wakers = Arc::downgrade(&this.wakers);
312                let loading = Arc::downgrade(&this.loading);
313                let fut = async move {
314                    // if the AsyncDerived has *already* been marked dirty (i.e., one of its
315                    // sources has changed after creation), we should throw out the Future
316                    // we already created, because its values might be stale
317                    let already_dirty = inner.upgrade()
318                        .as_ref()
319                        .and_then(|inner| inner.read().ok())
320                        .map(|inner| inner.state == AsyncDerivedState::Dirty)
321                        .unwrap_or(false);
322                    if already_dirty {
323                        initial_fut.take();
324                    }
325
326                    while rx.next().await.is_some() {
327                        let update_if_necessary = !owner.paused() && if $should_track {
328                            any_subscriber
329                                .with_observer(|| any_subscriber.update_if_necessary())
330                        } else {
331                            any_subscriber
332                                .with_observer_untracked(|| any_subscriber.update_if_necessary())
333                        };
334                        if update_if_necessary || first_run.is_some() {
335                            match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
336                                (Some(value), Some(inner), Some(wakers), Some(loading)) => {
337                                    // generate new Future
338                                    let owner = inner.read().or_poisoned().owner.clone();
339                                    let fut = initial_fut.take().unwrap_or_else(|| {
340                                        let fut = if $should_track {
341                                            owner.with_cleanup(|| {
342                                                any_subscriber
343                                                    .with_observer(|| ScopedFuture::new($fun()))
344                                            })
345                                        } else {
346                                            owner.with_cleanup(|| {
347                                                any_subscriber
348                                                    .with_observer_untracked(|| ScopedFuture::new($fun()))
349                                            })
350                                        };
351                                        #[cfg(feature = "sandboxed-arenas")]
352                                        let fut = Sandboxed::new(fut);
353                                        Box::pin(fut)
354                                    });
355
356                                    // register with global transition listener, if any
357                                    let ready_tx = first_run.take().unwrap_or_else(|| {
358                                        let (ready_tx, ready_rx) = oneshot::channel();
359                                        if !was_ready {
360                                            AsyncTransition::register(ready_rx);
361                                        }
362                                        ready_tx
363                                    });
364
365                                    // generate and assign new value
366                                    loading.store(true, Ordering::Relaxed);
367
368                                    let this_version = {
369                                        let mut guard = inner.write().or_poisoned();
370                                        guard.version += 1;
371                                        let version = guard.version;
372                                        let suspense_ids = mem::take(&mut guard.suspenses)
373                                            .into_iter()
374                                            .map(|sc| sc.task_id())
375                                            .collect::<Vec<_>>();
376                                        guard.pending_suspenses.extend(suspense_ids);
377                                        version
378                                    };
379
380                                    let new_value = fut.await;
381
382                                    let latest_version = {
383                                        let mut guard = inner.write().or_poisoned();
384                                        drop(mem::take(&mut guard.pending_suspenses));
385                                        guard.version
386                                    };
387
388                                    if latest_version == this_version {
389                                        Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
390                                    }
391                                }
392                                _ => break,
393                            }
394                        }
395                    }
396                };
397
398                #[cfg(feature = "sandboxed-arenas")]
399                let fut = Sandboxed::new(fut);
400
401                fut
402            });
403        }
404
405        (this, is_ready)
406    }};
407}
408
409impl<T: 'static> ArcAsyncDerived<T> {
410    async fn set_inner_value(
411        new_value: SendOption<T>,
412        value: Arc<AsyncRwLock<SendOption<T>>>,
413        wakers: Arc<RwLock<Vec<Waker>>>,
414        inner: Arc<RwLock<ArcAsyncDerivedInner>>,
415        loading: Arc<AtomicBool>,
416        ready_tx: Option<oneshot::Sender<()>>,
417    ) {
418        *value.write().await.deref_mut() = new_value;
419        Self::notify_subs(&wakers, &inner, &loading, ready_tx);
420    }
421
422    fn notify_subs(
423        wakers: &Arc<RwLock<Vec<Waker>>>,
424        inner: &Arc<RwLock<ArcAsyncDerivedInner>>,
425        loading: &Arc<AtomicBool>,
426        ready_tx: Option<oneshot::Sender<()>>,
427    ) {
428        loading.store(false, Ordering::Relaxed);
429
430        let prev_state = mem::replace(
431            &mut inner.write().or_poisoned().state,
432            AsyncDerivedState::Notifying,
433        );
434
435        if let Some(ready_tx) = ready_tx {
436            // if it's an Err, that just means the Receiver was dropped
437            // we don't particularly care about that: the point is to notify if
438            // it still exists, but we don't need to know if Suspense is no
439            // longer listening
440            _ = ready_tx.send(());
441        }
442
443        // notify reactive subscribers that we're not loading any more
444        for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
445            sub.mark_dirty();
446        }
447
448        // notify async .awaiters
449        for waker in mem::take(&mut *wakers.write().or_poisoned()) {
450            waker.wake();
451        }
452
453        // if this was marked dirty before notifications began, this means it
454        // had been notified while loading; marking it clean will cause it not to
455        // run on the next tick of the async loop, so here it should be left dirty
456        inner.write().or_poisoned().state = prev_state;
457    }
458}
459
460impl<T: 'static> ArcAsyncDerived<T> {
461    /// Creates a new async derived computation.
462    ///
463    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
464    /// as a new task.
465    #[track_caller]
466    pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
467    where
468        T: Send + Sync + 'static,
469        Fut: Future<Output = T> + Send + 'static,
470    {
471        Self::new_with_initial(None, fun)
472    }
473
474    /// Creates a new async derived computation with an initial value as a fallback, and begins running the
475    /// `Future` eagerly to get the actual first value.
476    #[track_caller]
477    pub fn new_with_initial<Fut>(
478        initial_value: Option<T>,
479        fun: impl Fn() -> Fut + Send + Sync + 'static,
480    ) -> Self
481    where
482        T: Send + Sync + 'static,
483        Fut: Future<Output = T> + Send + 'static,
484    {
485        let fun = move || {
486            let fut = fun();
487            let fut = async move { SendOption::new(Some(fut.await)) };
488            #[cfg(feature = "sandboxed-arenas")]
489            let fut = Sandboxed::new(fut);
490            fut
491        };
492        let initial_value = SendOption::new(initial_value);
493        let (this, _) = spawn_derived!(
494            crate::spawn,
495            initial_value,
496            fun,
497            true,
498            true,
499            true,
500            None::<ArcTrigger>
501        );
502        this
503    }
504
505    /// Creates a new async derived computation with an initial value, and does not spawn a task
506    /// initially.
507    ///
508    /// This is mostly used with manual dependency tracking, for primitives built on top of this
509    /// where you do not want to run the run the `Future` unnecessarily.
510    #[doc(hidden)]
511    #[track_caller]
512    pub fn new_with_manual_dependencies<Fut, S>(
513        initial_value: Option<T>,
514        fun: impl Fn() -> Fut + Send + Sync + 'static,
515        source: &S,
516    ) -> Self
517    where
518        T: Send + Sync + 'static,
519        Fut: Future<Output = T> + Send + 'static,
520        S: Track,
521    {
522        let fun = move || {
523            let fut = fun();
524            let fut =
525                ScopedFuture::new_untracked_with_diagnostics(async move {
526                    SendOption::new(Some(fut.await))
527                });
528            #[cfg(feature = "sandboxed-arenas")]
529            let fut = Sandboxed::new(fut);
530            fut
531        };
532        let initial_value = SendOption::new(initial_value);
533        let (this, _) = spawn_derived!(
534            crate::spawn,
535            initial_value,
536            fun,
537            true,
538            false,
539            false,
540            Some(source)
541        );
542        this
543    }
544
545    /// Creates a new async derived computation that will be guaranteed to run on the current
546    /// thread.
547    ///
548    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
549    /// as a new task.
550    #[track_caller]
551    pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
552    where
553        T: 'static,
554        Fut: Future<Output = T> + 'static,
555    {
556        Self::new_unsync_with_initial(None, fun)
557    }
558
559    /// Creates a new async derived computation with an initial value as a fallback, and begins running the
560    /// `Future` eagerly to get the actual first value.
561    #[track_caller]
562    pub fn new_unsync_with_initial<Fut>(
563        initial_value: Option<T>,
564        fun: impl Fn() -> Fut + 'static,
565    ) -> Self
566    where
567        T: 'static,
568        Fut: Future<Output = T> + 'static,
569    {
570        let fun = move || {
571            let fut = fun();
572            let fut = async move { SendOption::new_local(Some(fut.await)) };
573            #[cfg(feature = "sandboxed-arenas")]
574            let fut = Sandboxed::new(fut);
575            fut
576        };
577        let initial_value = SendOption::new_local(initial_value);
578        let (this, _) = spawn_derived!(
579            crate::spawn_local,
580            initial_value,
581            fun,
582            true,
583            true,
584            true,
585            None::<ArcTrigger>
586        );
587        this
588    }
589
590    /// Returns a `Future` that is ready when this resource has next finished loading.
591    pub fn ready(&self) -> AsyncDerivedReadyFuture {
592        AsyncDerivedReadyFuture::new(
593            self.to_any_source(),
594            &self.loading,
595            &self.wakers,
596        )
597    }
598}
599
600impl<T: 'static> ArcAsyncDerived<T> {
601    #[doc(hidden)]
602    #[track_caller]
603    pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
604    where
605        T: 'static,
606        Fut: Future<Output = T> + 'static,
607    {
608        let initial = SendOption::new_local(None::<T>);
609        let fun = move || {
610            let fut = fun();
611            let fut = async move { SendOption::new_local(Some(fut.await)) };
612            #[cfg(feature = "sandboxed-arenas")]
613            let fut = Sandboxed::new(fut);
614            fut
615        };
616        let (this, _) = spawn_derived!(
617            crate::spawn_local,
618            initial,
619            fun,
620            false,
621            false,
622            true,
623            None::<ArcTrigger>
624        );
625        this
626    }
627}
628
629impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
630    type Value =
631        ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
632
633    fn try_read_untracked(&self) -> Option<Self::Value> {
634        if let Some(suspense_context) = use_context::<SuspenseContext>() {
635            let handle = suspense_context.task_id();
636            let ready = SpecialNonReactiveFuture::new(self.ready());
637            crate::spawn(async move {
638                ready.await;
639                drop(handle);
640            });
641            self.inner
642                .write()
643                .or_poisoned()
644                .suspenses
645                .push(suspense_context);
646        }
647        AsyncPlain::try_new(&self.value).map(|plain| {
648            ReadGuard::new(Mapped::new_with_guard(plain, |v| v.deref()))
649        })
650    }
651}
652
653impl<T: 'static> Notify for ArcAsyncDerived<T> {
654    fn notify(&self) {
655        Self::notify_subs(&self.wakers, &self.inner, &self.loading, None);
656    }
657}
658
659impl<T: 'static> Write for ArcAsyncDerived<T> {
660    type Value = Option<T>;
661
662    fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
663        // increment the version, such that a rerun triggered previously does not overwrite this
664        // new value
665        let mut guard = self.inner.write().or_poisoned();
666        guard.version += 1;
667
668        // tell any suspenses to stop waiting for this
669        drop(mem::take(&mut guard.pending_suspenses));
670
671        Some(MappedMut::new(
672            WriteGuard::new(self.clone(), self.value.blocking_write()),
673            |v| v.deref(),
674            |v| v.deref_mut(),
675        ))
676    }
677
678    fn try_write_untracked(
679        &self,
680    ) -> Option<impl DerefMut<Target = Self::Value>> {
681        // increment the version, such that a rerun triggered previously does not overwrite this
682        // new value
683        let mut guard = self.inner.write().or_poisoned();
684        guard.version += 1;
685
686        // tell any suspenses to stop waiting for this
687        drop(mem::take(&mut guard.pending_suspenses));
688
689        Some(MappedMut::new(
690            self.value.blocking_write(),
691            |v| v.deref(),
692            |v| v.deref_mut(),
693        ))
694    }
695}
696
697impl<T: 'static> IsDisposed for ArcAsyncDerived<T> {
698    #[inline(always)]
699    fn is_disposed(&self) -> bool {
700        false
701    }
702}
703
704impl<T: 'static> ToAnySource for ArcAsyncDerived<T> {
705    fn to_any_source(&self) -> AnySource {
706        AnySource(
707            Arc::as_ptr(&self.inner) as usize,
708            Arc::downgrade(&self.inner) as Weak<dyn Source + Send + Sync>,
709            #[cfg(any(debug_assertions, leptos_debuginfo))]
710            self.defined_at,
711        )
712    }
713}
714
715impl<T: 'static> ToAnySubscriber for ArcAsyncDerived<T> {
716    fn to_any_subscriber(&self) -> AnySubscriber {
717        AnySubscriber(
718            Arc::as_ptr(&self.inner) as usize,
719            Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
720        )
721    }
722}
723
724impl<T> Source for ArcAsyncDerived<T> {
725    fn add_subscriber(&self, subscriber: AnySubscriber) {
726        self.inner.add_subscriber(subscriber);
727    }
728
729    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
730        self.inner.remove_subscriber(subscriber);
731    }
732
733    fn clear_subscribers(&self) {
734        self.inner.clear_subscribers();
735    }
736}
737
738impl<T> ReactiveNode for ArcAsyncDerived<T> {
739    fn mark_dirty(&self) {
740        self.inner.mark_dirty();
741    }
742
743    fn mark_check(&self) {
744        self.inner.mark_check();
745    }
746
747    fn mark_subscribers_check(&self) {
748        self.inner.mark_subscribers_check();
749    }
750
751    fn update_if_necessary(&self) -> bool {
752        self.inner.update_if_necessary()
753    }
754}
755
756impl<T> Subscriber for ArcAsyncDerived<T> {
757    fn add_source(&self, source: AnySource) {
758        self.inner.add_source(source);
759    }
760
761    fn clear_sources(&self, subscriber: &AnySubscriber) {
762        self.inner.clear_sources(subscriber);
763    }
764}