reactive_graph/computed/async_derived/
arc_async_derived.rs

1use super::{
2    inner::{ArcAsyncDerivedInner, AsyncDerivedState},
3    AsyncDerivedReadyFuture, ScopedFuture,
4};
5#[cfg(feature = "sandboxed-arenas")]
6use crate::owner::Sandboxed;
7use crate::{
8    channel::channel,
9    computed::suspense::SuspenseContext,
10    diagnostics::SpecialNonReactiveFuture,
11    graph::{
12        AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
13        SubscriberSet, ToAnySource, ToAnySubscriber, WithObserver,
14    },
15    owner::{use_context, Owner},
16    send_wrapper_ext::SendOption,
17    signal::{
18        guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
19        ArcTrigger,
20    },
21    traits::{
22        DefinedAt, IsDisposed, Notify, ReadUntracked, Track, UntrackableGuard,
23        Write,
24    },
25    transition::AsyncTransition,
26};
27use async_lock::RwLock as AsyncRwLock;
28use core::fmt::Debug;
29use futures::{channel::oneshot, FutureExt, StreamExt};
30use or_poisoned::OrPoisoned;
31use std::{
32    future::Future,
33    mem,
34    ops::{Deref, DerefMut},
35    panic::Location,
36    sync::{
37        atomic::{AtomicBool, Ordering},
38        Arc, RwLock, Weak,
39    },
40    task::Waker,
41};
42
43/// A reactive value that is derived by running an asynchronous computation in response to changes
44/// in its sources.
45///
46/// When one of its dependencies changes, this will re-run its async computation, then notify other
47/// values that depend on it that it has changed.
48///
49/// This is a reference-counted type, which is `Clone` but not `Copy`.
50/// For arena-allocated `Copy` memos, use [`AsyncDerived`](super::AsyncDerived).
51///
52/// ## Examples
53/// ```rust
54/// # use reactive_graph::computed::*;
55/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
56/// # use reactive_graph::prelude::*;
57/// # tokio_test::block_on(async move {
58/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
59/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
60///
61/// let signal1 = RwSignal::new(0);
62/// let signal2 = RwSignal::new(0);
63/// let derived = ArcAsyncDerived::new(move || async move {
64///   // reactive values can be tracked anywhere in the `async` block
65///   let value1 = signal1.get();
66///   tokio::time::sleep(std::time::Duration::from_millis(25)).await;
67///   let value2 = signal2.get();
68///
69///   value1 + value2
70/// });
71///
72/// // the value can be accessed synchronously as `Option<T>`
73/// assert_eq!(derived.get(), None);
74/// // we can also .await the value, i.e., convert it into a Future
75/// assert_eq!(derived.clone().await, 0);
76/// assert_eq!(derived.get(), Some(0));
77///
78/// signal1.set(1);
79/// // while the new value is still pending, the signal holds the old value
80/// tokio::time::sleep(std::time::Duration::from_millis(5)).await;
81/// assert_eq!(derived.get(), Some(0));
82///
83/// // setting multiple dependencies will hold until the latest change is ready
84/// signal2.set(1);
85/// assert_eq!(derived.await, 2);
86/// # });
87/// ```
88///
89/// ## Core Trait Implementations
90/// - [`.get()`](crate::traits::Get) clones the current value as an `Option<T>`.
91///   If you call it within an effect, it will cause that effect to subscribe
92///   to the memo, and to re-run whenever the value of the memo changes.
93///   - [`.get_untracked()`](crate::traits::GetUntracked) clones the value of
94///     without reactively tracking it.
95/// - [`.read()`](crate::traits::Read) returns a guard that allows accessing the
96///   value by reference. If you call it within an effect, it will
97///   cause that effect to subscribe to the memo, and to re-run whenever the
98///   value changes.
99///   - [`.read_untracked()`](crate::traits::ReadUntracked) gives access to the
100///     current value without reactively tracking it.
101/// - [`.with()`](crate::traits::With) allows you to reactively access the
102///   value without cloning by applying a callback function.
103///   - [`.with_untracked()`](crate::traits::WithUntracked) allows you to access
104///     the value by applying a callback function without reactively
105///     tracking it.
106/// - [`IntoFuture`](std::future::Future) allows you to create a [`Future`] that resolves
107///   when this resource is done loading.
108pub struct ArcAsyncDerived<T> {
109    #[cfg(any(debug_assertions, leptos_debuginfo))]
110    pub(crate) defined_at: &'static Location<'static>,
111    // the current state of this signal
112    pub(crate) value: Arc<AsyncRwLock<SendOption<T>>>,
113    // holds wakers generated when you .await this
114    pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
115    pub(crate) inner: Arc<RwLock<ArcAsyncDerivedInner>>,
116    pub(crate) loading: Arc<AtomicBool>,
117}
118
119#[allow(dead_code)]
120pub(crate) trait BlockingLock<T> {
121    fn blocking_read_arc(self: &Arc<Self>)
122        -> async_lock::RwLockReadGuardArc<T>;
123
124    fn blocking_write_arc(
125        self: &Arc<Self>,
126    ) -> async_lock::RwLockWriteGuardArc<T>;
127
128    fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>;
129
130    fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>;
131}
132
133impl<T> BlockingLock<T> for AsyncRwLock<T> {
134    fn blocking_read_arc(
135        self: &Arc<Self>,
136    ) -> async_lock::RwLockReadGuardArc<T> {
137        #[cfg(not(target_family = "wasm"))]
138        {
139            self.read_arc_blocking()
140        }
141        #[cfg(target_family = "wasm")]
142        {
143            self.read_arc().now_or_never().unwrap()
144        }
145    }
146
147    fn blocking_write_arc(
148        self: &Arc<Self>,
149    ) -> async_lock::RwLockWriteGuardArc<T> {
150        #[cfg(not(target_family = "wasm"))]
151        {
152            self.write_arc_blocking()
153        }
154        #[cfg(target_family = "wasm")]
155        {
156            self.write_arc().now_or_never().unwrap()
157        }
158    }
159
160    fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> {
161        #[cfg(not(target_family = "wasm"))]
162        {
163            self.read_blocking()
164        }
165        #[cfg(target_family = "wasm")]
166        {
167            self.read().now_or_never().unwrap()
168        }
169    }
170
171    fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> {
172        #[cfg(not(target_family = "wasm"))]
173        {
174            self.write_blocking()
175        }
176        #[cfg(target_family = "wasm")]
177        {
178            self.write().now_or_never().unwrap()
179        }
180    }
181}
182
183impl<T> Clone for ArcAsyncDerived<T> {
184    fn clone(&self) -> Self {
185        Self {
186            #[cfg(any(debug_assertions, leptos_debuginfo))]
187            defined_at: self.defined_at,
188            value: Arc::clone(&self.value),
189            wakers: Arc::clone(&self.wakers),
190            inner: Arc::clone(&self.inner),
191            loading: Arc::clone(&self.loading),
192        }
193    }
194}
195
196impl<T> Debug for ArcAsyncDerived<T> {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        let mut f = f.debug_struct("ArcAsyncDerived");
199        #[cfg(any(debug_assertions, leptos_debuginfo))]
200        f.field("defined_at", &self.defined_at);
201        f.finish_non_exhaustive()
202    }
203}
204
205impl<T> DefinedAt for ArcAsyncDerived<T> {
206    #[inline(always)]
207    fn defined_at(&self) -> Option<&'static Location<'static>> {
208        #[cfg(any(debug_assertions, leptos_debuginfo))]
209        {
210            Some(self.defined_at)
211        }
212        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
213        {
214            None
215        }
216    }
217}
218
219// This helps create a derived async signal.
220// It needs to be implemented as a macro because it needs to be flexible over
221// whether `fun` returns a `Future` that is `Send`. Doing it as a function would,
222// as far as I can tell, require repeating most of the function body.
223macro_rules! spawn_derived {
224    ($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal, $force_spawn:literal, $should_track:literal, $source:expr) => {{
225        let (notifier, mut rx) = channel();
226
227        let is_ready = $initial.is_some() && !$force_spawn;
228
229        let owner = Owner::new();
230        let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
231            owner: owner.clone(),
232            notifier,
233            sources: SourceSet::new(),
234            subscribers: SubscriberSet::new(),
235            state: AsyncDerivedState::Clean,
236            version: 0,
237            suspenses: Vec::new(),
238            pending_suspenses: Vec::new()
239        }));
240        let value = Arc::new(AsyncRwLock::new($initial));
241        let wakers = Arc::new(RwLock::new(Vec::new()));
242
243        let this = ArcAsyncDerived {
244            #[cfg(any(debug_assertions, leptos_debuginfo))]
245            defined_at: Location::caller(),
246            value: Arc::clone(&value),
247            wakers,
248            inner: Arc::clone(&inner),
249            loading: Arc::new(AtomicBool::new(!is_ready)),
250        };
251        let any_subscriber = this.to_any_subscriber();
252        let initial_fut = if $should_track {
253            owner.with_cleanup(|| {
254                any_subscriber
255                    .with_observer(|| ScopedFuture::new($fun()))
256            })
257        } else {
258            owner.with_cleanup(|| {
259                any_subscriber
260                    .with_observer_untracked(|| ScopedFuture::new($fun()))
261            })
262        };
263        #[cfg(feature = "sandboxed-arenas")]
264        let initial_fut = Sandboxed::new(initial_fut);
265        let mut initial_fut = Box::pin(initial_fut);
266
267        let (was_ready, mut initial_fut) = {
268            if is_ready {
269                (true, None)
270            } else {
271                // if we don't already know that it's ready, we need to poll once, initially
272                // so that the correct value is set synchronously
273                let initial = initial_fut.as_mut().now_or_never();
274                match initial {
275                    None => {
276                        inner.write().or_poisoned().notifier.notify();
277                        (false, Some(initial_fut))
278                    }
279                    Some(orig_value) => {
280                        let mut guard = this.inner.write().or_poisoned();
281
282                        guard.state = AsyncDerivedState::Clean;
283                        *value.blocking_write() = orig_value;
284                        this.loading.store(false, Ordering::Relaxed);
285                        (true, None)
286                    }
287                }
288            }
289        };
290
291        let mut first_run = {
292            let (ready_tx, ready_rx) = oneshot::channel();
293            if !was_ready {
294                AsyncTransition::register(ready_rx);
295            }
296            Some(ready_tx)
297        };
298
299        if was_ready {
300            first_run.take();
301        }
302
303        if let Some(source) = $source {
304            any_subscriber.with_observer(|| source.track());
305        }
306
307        if $should_spawn {
308            $spawner({
309                let value = Arc::downgrade(&this.value);
310                let inner = Arc::downgrade(&this.inner);
311                let wakers = Arc::downgrade(&this.wakers);
312                let loading = Arc::downgrade(&this.loading);
313                let fut = async move {
314                    // if the AsyncDerived has *already* been marked dirty (i.e., one of its
315                    // sources has changed after creation), we should throw out the Future
316                    // we already created, because its values might be stale
317                    let already_dirty = inner.upgrade()
318                        .as_ref()
319                        .and_then(|inner| inner.read().ok())
320                        .map(|inner| inner.state == AsyncDerivedState::Dirty)
321                        .unwrap_or(false);
322                    if already_dirty {
323                        initial_fut.take();
324                    }
325
326                    while rx.next().await.is_some() {
327                        let update_if_necessary = !owner.paused() && if $should_track {
328                            any_subscriber
329                                .with_observer(|| any_subscriber.update_if_necessary())
330                        } else {
331                            any_subscriber
332                                .with_observer_untracked(|| any_subscriber.update_if_necessary())
333                        };
334                        if update_if_necessary || first_run.is_some() {
335                            match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) {
336                                (Some(value), Some(inner), Some(wakers), Some(loading)) => {
337                                    // generate new Future
338                                    let owner = inner.read().or_poisoned().owner.clone();
339                                    let fut = initial_fut.take().unwrap_or_else(|| {
340                                        let fut = if $should_track {
341                                            owner.with_cleanup(|| {
342                                                any_subscriber
343                                                    .with_observer(|| ScopedFuture::new($fun()))
344                                            })
345                                        } else {
346                                            owner.with_cleanup(|| {
347                                                any_subscriber
348                                                    .with_observer_untracked(|| ScopedFuture::new($fun()))
349                                            })
350                                        };
351                                        #[cfg(feature = "sandboxed-arenas")]
352                                        let fut = Sandboxed::new(fut);
353                                        Box::pin(fut)
354                                    });
355
356                                    // register with global transition listener, if any
357                                    let ready_tx = first_run.take().unwrap_or_else(|| {
358                                        let (ready_tx, ready_rx) = oneshot::channel();
359                                        if !was_ready {
360                                            AsyncTransition::register(ready_rx);
361                                        }
362                                        ready_tx
363                                    });
364
365                                    // generate and assign new value
366                                    loading.store(true, Ordering::Relaxed);
367
368                                    let this_version = {
369                                        let mut guard = inner.write().or_poisoned();
370                                        guard.version += 1;
371                                        let version = guard.version;
372                                        let suspense_ids = mem::take(&mut guard.suspenses)
373                                            .into_iter()
374                                            .map(|sc| sc.task_id())
375                                            .collect::<Vec<_>>();
376                                        guard.pending_suspenses.extend(suspense_ids);
377                                        version
378                                    };
379
380                                    let new_value = fut.await;
381
382                                    let latest_version = {
383                                        let mut guard = inner.write().or_poisoned();
384                                        drop(mem::take(&mut guard.pending_suspenses));
385                                        guard.version
386                                    };
387
388                                    if latest_version == this_version {
389                                        Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await;
390                                    }
391                                }
392                                _ => break,
393                            }
394                        }
395                    }
396                };
397
398                #[cfg(feature = "sandboxed-arenas")]
399                let fut = Sandboxed::new(fut);
400
401                fut
402            });
403        }
404
405        (this, is_ready)
406    }};
407}
408
409impl<T: 'static> ArcAsyncDerived<T> {
410    async fn set_inner_value(
411        new_value: SendOption<T>,
412        value: Arc<AsyncRwLock<SendOption<T>>>,
413        wakers: Arc<RwLock<Vec<Waker>>>,
414        inner: Arc<RwLock<ArcAsyncDerivedInner>>,
415        loading: Arc<AtomicBool>,
416        ready_tx: Option<oneshot::Sender<()>>,
417    ) {
418        *value.write().await.deref_mut() = new_value;
419        Self::notify_subs(&wakers, &inner, &loading, ready_tx);
420    }
421
422    fn notify_subs(
423        wakers: &Arc<RwLock<Vec<Waker>>>,
424        inner: &Arc<RwLock<ArcAsyncDerivedInner>>,
425        loading: &Arc<AtomicBool>,
426        ready_tx: Option<oneshot::Sender<()>>,
427    ) {
428        loading.store(false, Ordering::Relaxed);
429
430        let prev_state = mem::replace(
431            &mut inner.write().or_poisoned().state,
432            AsyncDerivedState::Notifying,
433        );
434
435        if let Some(ready_tx) = ready_tx {
436            // if it's an Err, that just means the Receiver was dropped
437            // we don't particularly care about that: the point is to notify if
438            // it still exists, but we don't need to know if Suspense is no
439            // longer listening
440            _ = ready_tx.send(());
441        }
442
443        // notify reactive subscribers that we're not loading any more
444        for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
445            sub.mark_dirty();
446        }
447
448        // notify async .awaiters
449        for waker in mem::take(&mut *wakers.write().or_poisoned()) {
450            waker.wake();
451        }
452
453        // if this was marked dirty before notifications began, this means it
454        // had been notified while loading; marking it clean will cause it not to
455        // run on the next tick of the async loop, so here it should be left dirty
456        inner.write().or_poisoned().state = prev_state;
457    }
458}
459
460impl<T: 'static> ArcAsyncDerived<T> {
461    /// Creates a new async derived computation.
462    ///
463    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
464    /// as a new task.
465    #[track_caller]
466    pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
467    where
468        T: Send + Sync + 'static,
469        Fut: Future<Output = T> + Send + 'static,
470    {
471        Self::new_with_initial(None, fun)
472    }
473
474    /// Creates a new async derived computation with an initial value as a fallback, and begins running the
475    /// `Future` eagerly to get the actual first value.
476    #[track_caller]
477    pub fn new_with_initial<Fut>(
478        initial_value: Option<T>,
479        fun: impl Fn() -> Fut + Send + Sync + 'static,
480    ) -> Self
481    where
482        T: Send + Sync + 'static,
483        Fut: Future<Output = T> + Send + 'static,
484    {
485        let fun = move || {
486            let fut = fun();
487            let fut = async move { SendOption::new(Some(fut.await)) };
488            #[cfg(feature = "sandboxed-arenas")]
489            let fut = Sandboxed::new(fut);
490            fut
491        };
492        let initial_value = SendOption::new(initial_value);
493        let (this, _) = spawn_derived!(
494            crate::spawn,
495            initial_value,
496            fun,
497            true,
498            true,
499            true,
500            None::<ArcTrigger>
501        );
502        this
503    }
504
505    /// Creates a new async derived computation with an initial value, and does not spawn a task
506    /// initially.
507    ///
508    /// This is mostly used with manual dependency tracking, for primitives built on top of this
509    /// where you do not want to run the run the `Future` unnecessarily.
510    #[doc(hidden)]
511    #[track_caller]
512    pub fn new_with_manual_dependencies<Fut, S>(
513        initial_value: Option<T>,
514        fun: impl Fn() -> Fut + Send + Sync + 'static,
515        source: &S,
516    ) -> Self
517    where
518        T: Send + Sync + 'static,
519        Fut: Future<Output = T> + Send + 'static,
520        S: Track,
521    {
522        let fun = move || {
523            let fut = fun();
524            let fut = ScopedFuture::new_untracked(async move {
525                SendOption::new(Some(fut.await))
526            });
527            #[cfg(feature = "sandboxed-arenas")]
528            let fut = Sandboxed::new(fut);
529            fut
530        };
531        let initial_value = SendOption::new(initial_value);
532        let (this, _) = spawn_derived!(
533            crate::spawn,
534            initial_value,
535            fun,
536            true,
537            false,
538            false,
539            Some(source)
540        );
541        this
542    }
543
544    /// Creates a new async derived computation that will be guaranteed to run on the current
545    /// thread.
546    ///
547    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
548    /// as a new task.
549    #[track_caller]
550    pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
551    where
552        T: 'static,
553        Fut: Future<Output = T> + 'static,
554    {
555        Self::new_unsync_with_initial(None, fun)
556    }
557
558    /// Creates a new async derived computation with an initial value as a fallback, and begins running the
559    /// `Future` eagerly to get the actual first value.
560    #[track_caller]
561    pub fn new_unsync_with_initial<Fut>(
562        initial_value: Option<T>,
563        fun: impl Fn() -> Fut + 'static,
564    ) -> Self
565    where
566        T: 'static,
567        Fut: Future<Output = T> + 'static,
568    {
569        let fun = move || {
570            let fut = fun();
571            let fut = async move { SendOption::new_local(Some(fut.await)) };
572            #[cfg(feature = "sandboxed-arenas")]
573            let fut = Sandboxed::new(fut);
574            fut
575        };
576        let initial_value = SendOption::new_local(initial_value);
577        let (this, _) = spawn_derived!(
578            crate::spawn_local,
579            initial_value,
580            fun,
581            true,
582            true,
583            true,
584            None::<ArcTrigger>
585        );
586        this
587    }
588
589    /// Returns a `Future` that is ready when this resource has next finished loading.
590    pub fn ready(&self) -> AsyncDerivedReadyFuture {
591        AsyncDerivedReadyFuture::new(
592            self.to_any_source(),
593            &self.loading,
594            &self.wakers,
595        )
596    }
597}
598
599impl<T: 'static> ArcAsyncDerived<T> {
600    #[doc(hidden)]
601    #[track_caller]
602    pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
603    where
604        T: 'static,
605        Fut: Future<Output = T> + 'static,
606    {
607        let initial = SendOption::new_local(None::<T>);
608        let fun = move || {
609            let fut = fun();
610            let fut = async move { SendOption::new_local(Some(fut.await)) };
611            #[cfg(feature = "sandboxed-arenas")]
612            let fut = Sandboxed::new(fut);
613            fut
614        };
615        let (this, _) = spawn_derived!(
616            crate::spawn_local,
617            initial,
618            fun,
619            false,
620            false,
621            true,
622            None::<ArcTrigger>
623        );
624        this
625    }
626}
627
628impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
629    type Value =
630        ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
631
632    fn try_read_untracked(&self) -> Option<Self::Value> {
633        if let Some(suspense_context) = use_context::<SuspenseContext>() {
634            let handle = suspense_context.task_id();
635            let ready = SpecialNonReactiveFuture::new(self.ready());
636            crate::spawn(async move {
637                ready.await;
638                drop(handle);
639            });
640            self.inner
641                .write()
642                .or_poisoned()
643                .suspenses
644                .push(suspense_context);
645        }
646        AsyncPlain::try_new(&self.value).map(|plain| {
647            ReadGuard::new(Mapped::new_with_guard(plain, |v| v.deref()))
648        })
649    }
650}
651
652impl<T: 'static> Notify for ArcAsyncDerived<T> {
653    fn notify(&self) {
654        Self::notify_subs(&self.wakers, &self.inner, &self.loading, None);
655    }
656}
657
658impl<T: 'static> Write for ArcAsyncDerived<T> {
659    type Value = Option<T>;
660
661    fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
662        // increment the version, such that a rerun triggered previously does not overwrite this
663        // new value
664        let mut guard = self.inner.write().or_poisoned();
665        guard.version += 1;
666
667        // tell any suspenses to stop waiting for this
668        drop(mem::take(&mut guard.pending_suspenses));
669
670        Some(MappedMut::new(
671            WriteGuard::new(self.clone(), self.value.blocking_write()),
672            |v| v.deref(),
673            |v| v.deref_mut(),
674        ))
675    }
676
677    fn try_write_untracked(
678        &self,
679    ) -> Option<impl DerefMut<Target = Self::Value>> {
680        // increment the version, such that a rerun triggered previously does not overwrite this
681        // new value
682        let mut guard = self.inner.write().or_poisoned();
683        guard.version += 1;
684
685        // tell any suspenses to stop waiting for this
686        drop(mem::take(&mut guard.pending_suspenses));
687
688        Some(MappedMut::new(
689            self.value.blocking_write(),
690            |v| v.deref(),
691            |v| v.deref_mut(),
692        ))
693    }
694}
695
696impl<T: 'static> IsDisposed for ArcAsyncDerived<T> {
697    #[inline(always)]
698    fn is_disposed(&self) -> bool {
699        false
700    }
701}
702
703impl<T: 'static> ToAnySource for ArcAsyncDerived<T> {
704    fn to_any_source(&self) -> AnySource {
705        AnySource(
706            Arc::as_ptr(&self.inner) as usize,
707            Arc::downgrade(&self.inner) as Weak<dyn Source + Send + Sync>,
708            #[cfg(any(debug_assertions, leptos_debuginfo))]
709            self.defined_at,
710        )
711    }
712}
713
714impl<T: 'static> ToAnySubscriber for ArcAsyncDerived<T> {
715    fn to_any_subscriber(&self) -> AnySubscriber {
716        AnySubscriber(
717            Arc::as_ptr(&self.inner) as usize,
718            Arc::downgrade(&self.inner) as Weak<dyn Subscriber + Send + Sync>,
719        )
720    }
721}
722
723impl<T> Source for ArcAsyncDerived<T> {
724    fn add_subscriber(&self, subscriber: AnySubscriber) {
725        self.inner.add_subscriber(subscriber);
726    }
727
728    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
729        self.inner.remove_subscriber(subscriber);
730    }
731
732    fn clear_subscribers(&self) {
733        self.inner.clear_subscribers();
734    }
735}
736
737impl<T> ReactiveNode for ArcAsyncDerived<T> {
738    fn mark_dirty(&self) {
739        self.inner.mark_dirty();
740    }
741
742    fn mark_check(&self) {
743        self.inner.mark_check();
744    }
745
746    fn mark_subscribers_check(&self) {
747        self.inner.mark_subscribers_check();
748    }
749
750    fn update_if_necessary(&self) -> bool {
751        self.inner.update_if_necessary()
752    }
753}
754
755impl<T> Subscriber for ArcAsyncDerived<T> {
756    fn add_source(&self, source: AnySource) {
757        self.inner.add_source(source);
758    }
759
760    fn clear_sources(&self, subscriber: &AnySubscriber) {
761        self.inner.clear_sources(subscriber);
762    }
763}