reactive_graph/computed/async_derived/
async_derived.rs

1use super::{ArcAsyncDerived, AsyncDerivedReadyFuture, BlockingLock};
2use crate::{
3    graph::{
4        AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
5        ToAnySource, ToAnySubscriber,
6    },
7    owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
8    send_wrapper_ext::SendOption,
9    signal::guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
10    traits::{
11        DefinedAt, Dispose, IsDisposed, Notify, ReadUntracked,
12        UntrackableGuard, Write,
13    },
14    unwrap_signal,
15};
16use core::fmt::Debug;
17use or_poisoned::OrPoisoned;
18use std::{
19    future::Future,
20    mem,
21    ops::{Deref, DerefMut},
22    panic::Location,
23};
24
25/// A reactive value that is derived by running an asynchronous computation in response to changes
26/// in its sources.
27///
28/// When one of its dependencies changes, this will re-run its async computation, then notify other
29/// values that depend on it that it has changed.
30///
31/// This is an arena-allocated type, which is `Copy` and is disposed when its reactive
32/// [`Owner`](crate::owner::Owner) cleans up. For a reference-counted signal that lives as
33/// as long as a reference to it is alive, see [`ArcAsyncDerived`].
34///
35/// ## Examples
36/// ```rust
37/// # use reactive_graph::computed::*;
38/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
39/// # use reactive_graph::prelude::*;
40/// # tokio_test::block_on(async move {
41/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
42/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
43///
44/// let signal1 = RwSignal::new(0);
45/// let signal2 = RwSignal::new(0);
46/// let derived = AsyncDerived::new(move || async move {
47///   // reactive values can be tracked anywhere in the `async` block
48///   let value1 = signal1.get();
49///   tokio::time::sleep(std::time::Duration::from_millis(25)).await;
50///   let value2 = signal2.get();
51///
52///   value1 + value2
53/// });
54///
55/// // the value can be accessed synchronously as `Option<T>`
56/// assert_eq!(derived.get(), None);
57/// // we can also .await the value, i.e., convert it into a Future
58/// assert_eq!(derived.await, 0);
59/// assert_eq!(derived.get(), Some(0));
60///
61/// signal1.set(1);
62/// // while the new value is still pending, the signal holds the old value
63/// tokio::time::sleep(std::time::Duration::from_millis(5)).await;
64/// assert_eq!(derived.get(), Some(0));
65///
66/// // setting multiple dependencies will hold until the latest change is ready
67/// signal2.set(1);
68/// assert_eq!(derived.await, 2);
69/// # });
70/// ```
71///
72/// ## Core Trait Implementations
73/// - [`.get()`](crate::traits::Get) clones the current value as an `Option<T>`.
74///   If you call it within an effect, it will cause that effect to subscribe
75///   to the memo, and to re-run whenever the value of the memo changes.
76///   - [`.get_untracked()`](crate::traits::GetUntracked) clones the value of
77///     without reactively tracking it.
78/// - [`.read()`](crate::traits::Read) returns a guard that allows accessing the
79///   value by reference. If you call it within an effect, it will
80///   cause that effect to subscribe to the memo, and to re-run whenever the
81///   value changes.
82///   - [`.read_untracked()`](crate::traits::ReadUntracked) gives access to the
83///     current value without reactively tracking it.
84/// - [`.with()`](crate::traits::With) allows you to reactively access the
85///   value without cloning by applying a callback function.
86///   - [`.with_untracked()`](crate::traits::WithUntracked) allows you to access
87///     the value by applying a callback function without reactively
88///     tracking it.
89/// - [`IntoFuture`](std::future::Future) allows you to create a [`Future`] that resolves
90///   when this resource is done loading.
91pub struct AsyncDerived<T, S = SyncStorage> {
92    #[cfg(any(debug_assertions, leptos_debuginfo))]
93    defined_at: &'static Location<'static>,
94    pub(crate) inner: ArenaItem<ArcAsyncDerived<T>, S>,
95}
96
97impl<T, S> Dispose for AsyncDerived<T, S> {
98    fn dispose(self) {
99        self.inner.dispose()
100    }
101}
102
103impl<T, S> From<ArcAsyncDerived<T>> for AsyncDerived<T, S>
104where
105    T: 'static,
106    S: Storage<ArcAsyncDerived<T>>,
107{
108    fn from(value: ArcAsyncDerived<T>) -> Self {
109        #[cfg(any(debug_assertions, leptos_debuginfo))]
110        let defined_at = value.defined_at;
111        Self {
112            #[cfg(any(debug_assertions, leptos_debuginfo))]
113            defined_at,
114            inner: ArenaItem::new_with_storage(value),
115        }
116    }
117}
118
119impl<T, S> From<AsyncDerived<T, S>> for ArcAsyncDerived<T>
120where
121    T: 'static,
122    S: Storage<ArcAsyncDerived<T>>,
123{
124    #[track_caller]
125    fn from(value: AsyncDerived<T, S>) -> Self {
126        value
127            .inner
128            .try_get_value()
129            .unwrap_or_else(unwrap_signal!(value))
130    }
131}
132
133impl<T> FromLocal<ArcAsyncDerived<T>> for AsyncDerived<T, LocalStorage>
134where
135    T: 'static,
136{
137    fn from_local(value: ArcAsyncDerived<T>) -> Self {
138        #[cfg(any(debug_assertions, leptos_debuginfo))]
139        let defined_at = value.defined_at;
140        Self {
141            #[cfg(any(debug_assertions, leptos_debuginfo))]
142            defined_at,
143            inner: ArenaItem::new_with_storage(value),
144        }
145    }
146}
147
148impl<T> AsyncDerived<T>
149where
150    T: 'static,
151{
152    /// Creates a new async derived computation.
153    ///
154    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
155    /// as a new task.
156    #[track_caller]
157    pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
158    where
159        T: Send + Sync + 'static,
160        Fut: Future<Output = T> + Send + 'static,
161    {
162        Self {
163            #[cfg(any(debug_assertions, leptos_debuginfo))]
164            defined_at: Location::caller(),
165            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new(fun)),
166        }
167    }
168
169    /// Creates a new async derived computation with an initial value.
170    ///
171    /// If the initial value is `Some(_)`, the task will not be run initially.
172    pub fn new_with_initial<Fut>(
173        initial_value: Option<T>,
174        fun: impl Fn() -> Fut + Send + Sync + 'static,
175    ) -> Self
176    where
177        T: Send + Sync + 'static,
178        Fut: Future<Output = T> + Send + 'static,
179    {
180        Self {
181            #[cfg(any(debug_assertions, leptos_debuginfo))]
182            defined_at: Location::caller(),
183            inner: ArenaItem::new_with_storage(
184                ArcAsyncDerived::new_with_initial(initial_value, fun),
185            ),
186        }
187    }
188}
189
190impl<T> AsyncDerived<T> {
191    #[doc(hidden)]
192    pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
193    where
194        T: 'static,
195        Fut: Future<Output = T> + 'static,
196    {
197        Self {
198            #[cfg(any(debug_assertions, leptos_debuginfo))]
199            defined_at: Location::caller(),
200            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_mock(fun)),
201        }
202    }
203
204    /// Same as [`AsyncDerived::new_unsync`] except it produces AsyncDerived<T> instead of AsyncDerived<T, LocalStorage>.
205    /// The internal value will still be wrapped in a [`send_wrapper::SendWrapper`].
206    pub fn new_unsync_threadsafe_storage<Fut>(
207        fun: impl Fn() -> Fut + 'static,
208    ) -> Self
209    where
210        T: 'static,
211        Fut: Future<Output = T> + 'static,
212    {
213        Self {
214            #[cfg(any(debug_assertions, leptos_debuginfo))]
215            defined_at: Location::caller(),
216            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
217                fun,
218            )),
219        }
220    }
221}
222
223impl<T> AsyncDerived<T, LocalStorage>
224where
225    T: 'static,
226{
227    /// Creates a new async derived computation that will be guaranteed to run on the current
228    /// thread.
229    ///
230    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
231    /// as a new task.
232    pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
233    where
234        T: 'static,
235        Fut: Future<Output = T> + 'static,
236    {
237        Self {
238            #[cfg(any(debug_assertions, leptos_debuginfo))]
239            defined_at: Location::caller(),
240            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
241                fun,
242            )),
243        }
244    }
245
246    /// Creates a new async derived computation with an initial value. Async work will be
247    /// guaranteed to run only on the current thread.
248    ///
249    /// If the initial value is `Some(_)`, the task will not be run initially.
250    pub fn new_unsync_with_initial<Fut>(
251        initial_value: Option<T>,
252        fun: impl Fn() -> Fut + 'static,
253    ) -> Self
254    where
255        T: 'static,
256        Fut: Future<Output = T> + 'static,
257    {
258        Self {
259            #[cfg(any(debug_assertions, leptos_debuginfo))]
260            defined_at: Location::caller(),
261            inner: ArenaItem::new_with_storage(
262                ArcAsyncDerived::new_unsync_with_initial(initial_value, fun),
263            ),
264        }
265    }
266}
267
268impl<T, S> AsyncDerived<T, S>
269where
270    T: 'static,
271    S: Storage<ArcAsyncDerived<T>>,
272{
273    /// Returns a `Future` that is ready when this resource has next finished loading.
274    #[track_caller]
275    pub fn ready(&self) -> AsyncDerivedReadyFuture {
276        let this = self
277            .inner
278            .try_get_value()
279            .unwrap_or_else(unwrap_signal!(self));
280        this.ready()
281    }
282}
283
284impl<T, S> Copy for AsyncDerived<T, S> {}
285
286impl<T, S> Clone for AsyncDerived<T, S> {
287    fn clone(&self) -> Self {
288        *self
289    }
290}
291
292impl<T, S> Debug for AsyncDerived<T, S>
293where
294    S: Debug,
295{
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct("AsyncDerived")
298            .field("type", &std::any::type_name::<T>())
299            .field("store", &self.inner)
300            .finish()
301    }
302}
303
304impl<T, S> DefinedAt for AsyncDerived<T, S> {
305    #[inline(always)]
306    fn defined_at(&self) -> Option<&'static Location<'static>> {
307        #[cfg(any(debug_assertions, leptos_debuginfo))]
308        {
309            Some(self.defined_at)
310        }
311        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
312        {
313            None
314        }
315    }
316}
317
318impl<T, S> ReadUntracked for AsyncDerived<T, S>
319where
320    T: 'static,
321    S: Storage<ArcAsyncDerived<T>>,
322{
323    type Value =
324        ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
325
326    fn try_read_untracked(&self) -> Option<Self::Value> {
327        self.inner
328            .try_get_value()
329            .map(|inner| inner.read_untracked())
330    }
331}
332
333impl<T, S> Notify for AsyncDerived<T, S>
334where
335    T: 'static,
336    S: Storage<ArcAsyncDerived<T>>,
337{
338    fn notify(&self) {
339        self.inner.try_with_value(|inner| inner.notify());
340    }
341}
342
343impl<T, S> Write for AsyncDerived<T, S>
344where
345    T: 'static,
346    S: Storage<ArcAsyncDerived<T>>,
347{
348    type Value = Option<T>;
349
350    fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
351        let guard = self
352            .inner
353            .try_with_value(|n| n.value.blocking_write_arc())?;
354
355        self.inner.try_with_value(|n| {
356            let mut guard = n.inner.write().or_poisoned();
357            // increment the version, such that a rerun triggered previously does not overwrite this
358            // new value
359            guard.version += 1;
360
361            // tell any suspenses to stop waiting for this
362            drop(mem::take(&mut guard.pending_suspenses));
363        });
364
365        Some(MappedMut::new(
366            WriteGuard::new(*self, guard),
367            |v| v.deref(),
368            |v| v.deref_mut(),
369        ))
370    }
371
372    fn try_write_untracked(
373        &self,
374    ) -> Option<impl DerefMut<Target = Self::Value>> {
375        self.inner.try_with_value(|n| {
376            let mut guard = n.inner.write().or_poisoned();
377            // increment the version, such that a rerun triggered previously does not overwrite this
378            // new value
379            guard.version += 1;
380
381            // tell any suspenses to stop waiting for this
382            drop(mem::take(&mut guard.pending_suspenses));
383        });
384
385        self.inner
386            .try_with_value(|n| n.value.blocking_write_arc())
387            .map(|inner| {
388                MappedMut::new(inner, |v| v.deref(), |v| v.deref_mut())
389            })
390    }
391}
392
393impl<T, S> IsDisposed for AsyncDerived<T, S>
394where
395    T: 'static,
396    S: Storage<ArcAsyncDerived<T>>,
397{
398    fn is_disposed(&self) -> bool {
399        self.inner.is_disposed()
400    }
401}
402
403impl<T, S> ToAnySource for AsyncDerived<T, S>
404where
405    T: 'static,
406    S: Storage<ArcAsyncDerived<T>>,
407{
408    fn to_any_source(&self) -> AnySource {
409        self.inner
410            .try_get_value()
411            .map(|inner| inner.to_any_source())
412            .unwrap_or_else(unwrap_signal!(self))
413    }
414}
415
416impl<T, S> ToAnySubscriber for AsyncDerived<T, S>
417where
418    T: 'static,
419    S: Storage<ArcAsyncDerived<T>>,
420{
421    fn to_any_subscriber(&self) -> AnySubscriber {
422        self.inner
423            .try_get_value()
424            .map(|inner| inner.to_any_subscriber())
425            .unwrap_or_else(unwrap_signal!(self))
426    }
427}
428
429impl<T, S> Source for AsyncDerived<T, S>
430where
431    T: 'static,
432    S: Storage<ArcAsyncDerived<T>>,
433{
434    fn add_subscriber(&self, subscriber: AnySubscriber) {
435        if let Some(inner) = self.inner.try_get_value() {
436            inner.add_subscriber(subscriber);
437        }
438    }
439
440    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
441        if let Some(inner) = self.inner.try_get_value() {
442            inner.remove_subscriber(subscriber);
443        }
444    }
445
446    fn clear_subscribers(&self) {
447        if let Some(inner) = self.inner.try_get_value() {
448            inner.clear_subscribers();
449        }
450    }
451}
452
453impl<T, S> ReactiveNode for AsyncDerived<T, S>
454where
455    T: 'static,
456    S: Storage<ArcAsyncDerived<T>>,
457{
458    fn mark_dirty(&self) {
459        if let Some(inner) = self.inner.try_get_value() {
460            inner.mark_dirty();
461        }
462    }
463
464    fn mark_check(&self) {
465        if let Some(inner) = self.inner.try_get_value() {
466            inner.mark_check();
467        }
468    }
469
470    fn mark_subscribers_check(&self) {
471        if let Some(inner) = self.inner.try_get_value() {
472            inner.mark_subscribers_check();
473        }
474    }
475
476    fn update_if_necessary(&self) -> bool {
477        if let Some(inner) = self.inner.try_get_value() {
478            inner.update_if_necessary()
479        } else {
480            false
481        }
482    }
483}
484
485impl<T, S> Subscriber for AsyncDerived<T, S>
486where
487    T: 'static,
488    S: Storage<ArcAsyncDerived<T>>,
489{
490    fn add_source(&self, source: AnySource) {
491        if let Some(inner) = self.inner.try_get_value() {
492            inner.add_source(source);
493        }
494    }
495
496    fn clear_sources(&self, subscriber: &AnySubscriber) {
497        if let Some(inner) = self.inner.try_get_value() {
498            inner.clear_sources(subscriber);
499        }
500    }
501}