reactive_graph/computed/async_derived/
async_derived.rs

1use super::{ArcAsyncDerived, AsyncDerivedReadyFuture, BlockingLock};
2use crate::{
3    graph::{
4        AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
5        ToAnySource, ToAnySubscriber,
6    },
7    owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
8    signal::guards::{AsyncPlain, ReadGuard, WriteGuard},
9    traits::{
10        DefinedAt, Dispose, IsDisposed, Notify, ReadUntracked,
11        UntrackableGuard, Write,
12    },
13    unwrap_signal,
14};
15use core::fmt::Debug;
16use send_wrapper::SendWrapper;
17use std::{future::Future, ops::DerefMut, panic::Location};
18
19/// A reactive value that is derived by running an asynchronous computation in response to changes
20/// in its sources.
21///
22/// When one of its dependencies changes, this will re-run its async computation, then notify other
23/// values that depend on it that it has changed.
24///
25/// This is an arena-allocated type, which is `Copy` and is disposed when its reactive
26/// [`Owner`](crate::owner::Owner) cleans up. For a reference-counted signal that livesas
27/// as long as a reference to it is alive, see [`ArcAsyncDerived`].
28///
29/// ## Examples
30/// ```rust
31/// # use reactive_graph::computed::*;
32/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
33/// # use reactive_graph::prelude::*;
34/// # tokio_test::block_on(async move {
35/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
36/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
37///
38/// let signal1 = RwSignal::new(0);
39/// let signal2 = RwSignal::new(0);
40/// let derived = AsyncDerived::new(move || async move {
41///   // reactive values can be tracked anywhere in the `async` block
42///   let value1 = signal1.get();
43///   tokio::time::sleep(std::time::Duration::from_millis(25)).await;
44///   let value2 = signal2.get();
45///
46///   value1 + value2
47/// });
48///
49/// // the value can be accessed synchronously as `Option<T>`
50/// assert_eq!(derived.get(), None);
51/// // we can also .await the value, i.e., convert it into a Future
52/// assert_eq!(derived.await, 0);
53/// assert_eq!(derived.get(), Some(0));
54///
55/// signal1.set(1);
56/// // while the new value is still pending, the signal holds the old value
57/// tokio::time::sleep(std::time::Duration::from_millis(5)).await;
58/// assert_eq!(derived.get(), Some(0));
59///
60/// // setting multiple dependencies will hold until the latest change is ready
61/// signal2.set(1);
62/// assert_eq!(derived.await, 2);
63/// # });
64/// ```
65///
66/// ## Core Trait Implementations
67/// - [`.get()`](crate::traits::Get) clones the current value as an `Option<T>`.
68///   If you call it within an effect, it will cause that effect to subscribe
69///   to the memo, and to re-run whenever the value of the memo changes.
70///   - [`.get_untracked()`](crate::traits::GetUntracked) clones the value of
71///     without reactively tracking it.
72/// - [`.read()`](crate::traits::Read) returns a guard that allows accessing the
73///   value by reference. If you call it within an effect, it will
74///   cause that effect to subscribe to the memo, and to re-run whenever the
75///   value changes.
76///   - [`.read_untracked()`](crate::traits::ReadUntracked) gives access to the
77///     current value without reactively tracking it.
78/// - [`.with()`](crate::traits::With) allows you to reactively access the
79///   value without cloning by applying a callback function.
80///   - [`.with_untracked()`](crate::traits::WithUntracked) allows you to access
81///     the value by applying a callback function without reactively
82///     tracking it.
83/// - [`IntoFuture`](std::future::Future) allows you to create a [`Future`] that resolves
84///   when this resource is done loading.
85pub struct AsyncDerived<T, S = SyncStorage> {
86    #[cfg(any(debug_assertions, leptos_debuginfo))]
87    defined_at: &'static Location<'static>,
88    pub(crate) inner: ArenaItem<ArcAsyncDerived<T>, S>,
89}
90
91impl<T, S> Dispose for AsyncDerived<T, S> {
92    fn dispose(self) {
93        self.inner.dispose()
94    }
95}
96
97impl<T> From<ArcAsyncDerived<T>> for AsyncDerived<T>
98where
99    T: Send + Sync + 'static,
100{
101    fn from(value: ArcAsyncDerived<T>) -> Self {
102        #[cfg(any(debug_assertions, leptos_debuginfo))]
103        let defined_at = value.defined_at;
104        Self {
105            #[cfg(any(debug_assertions, leptos_debuginfo))]
106            defined_at,
107            inner: ArenaItem::new_with_storage(value),
108        }
109    }
110}
111
112impl<T> From<AsyncDerived<T>> for ArcAsyncDerived<T>
113where
114    T: Send + Sync + 'static,
115{
116    #[track_caller]
117    fn from(value: AsyncDerived<T>) -> Self {
118        value
119            .inner
120            .try_get_value()
121            .unwrap_or_else(unwrap_signal!(value))
122    }
123}
124
125impl<T> FromLocal<ArcAsyncDerived<T>> for AsyncDerived<T, LocalStorage>
126where
127    T: 'static,
128{
129    fn from_local(value: ArcAsyncDerived<T>) -> Self {
130        #[cfg(any(debug_assertions, leptos_debuginfo))]
131        let defined_at = value.defined_at;
132        Self {
133            #[cfg(any(debug_assertions, leptos_debuginfo))]
134            defined_at,
135            inner: ArenaItem::new_with_storage(value),
136        }
137    }
138}
139
140impl<T> AsyncDerived<T>
141where
142    T: 'static,
143{
144    /// Creates a new async derived computation.
145    ///
146    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
147    /// as a new task.
148    #[track_caller]
149    pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
150    where
151        T: Send + Sync + 'static,
152        Fut: Future<Output = T> + Send + 'static,
153    {
154        Self {
155            #[cfg(any(debug_assertions, leptos_debuginfo))]
156            defined_at: Location::caller(),
157            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new(fun)),
158        }
159    }
160
161    /// Creates a new async derived computation with an initial value.
162    ///
163    /// If the initial value is `Some(_)`, the task will not be run initially.
164    pub fn new_with_initial<Fut>(
165        initial_value: Option<T>,
166        fun: impl Fn() -> Fut + Send + Sync + 'static,
167    ) -> Self
168    where
169        T: Send + Sync + 'static,
170        Fut: Future<Output = T> + Send + 'static,
171    {
172        Self {
173            #[cfg(any(debug_assertions, leptos_debuginfo))]
174            defined_at: Location::caller(),
175            inner: ArenaItem::new_with_storage(
176                ArcAsyncDerived::new_with_initial(initial_value, fun),
177            ),
178        }
179    }
180}
181
182impl<T> AsyncDerived<SendWrapper<T>> {
183    #[doc(hidden)]
184    pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
185    where
186        T: 'static,
187        Fut: Future<Output = T> + 'static,
188    {
189        Self {
190            #[cfg(any(debug_assertions, leptos_debuginfo))]
191            defined_at: Location::caller(),
192            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_mock(fun)),
193        }
194    }
195}
196
197impl<T> AsyncDerived<T, LocalStorage>
198where
199    T: 'static,
200{
201    /// Creates a new async derived computation that will be guaranteed to run on the current
202    /// thread.
203    ///
204    /// This runs eagerly: i.e., calls `fun` once when created and immediately spawns the `Future`
205    /// as a new task.
206    pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
207    where
208        T: 'static,
209        Fut: Future<Output = T> + 'static,
210    {
211        Self {
212            #[cfg(any(debug_assertions, leptos_debuginfo))]
213            defined_at: Location::caller(),
214            inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
215                fun,
216            )),
217        }
218    }
219
220    /// Creates a new async derived computation with an initial value. Async work will be
221    /// guaranteed to run only on the current thread.
222    ///
223    /// If the initial value is `Some(_)`, the task will not be run initially.
224    pub fn new_unsync_with_initial<Fut>(
225        initial_value: Option<T>,
226        fun: impl Fn() -> Fut + 'static,
227    ) -> Self
228    where
229        T: 'static,
230        Fut: Future<Output = T> + 'static,
231    {
232        Self {
233            #[cfg(any(debug_assertions, leptos_debuginfo))]
234            defined_at: Location::caller(),
235            inner: ArenaItem::new_with_storage(
236                ArcAsyncDerived::new_unsync_with_initial(initial_value, fun),
237            ),
238        }
239    }
240}
241
242impl<T, S> AsyncDerived<T, S>
243where
244    T: 'static,
245    S: Storage<ArcAsyncDerived<T>>,
246{
247    /// Returns a `Future` that is ready when this resource has next finished loading.
248    #[track_caller]
249    pub fn ready(&self) -> AsyncDerivedReadyFuture {
250        let this = self
251            .inner
252            .try_get_value()
253            .unwrap_or_else(unwrap_signal!(self));
254        this.ready()
255    }
256}
257
258impl<T, S> Copy for AsyncDerived<T, S> {}
259
260impl<T, S> Clone for AsyncDerived<T, S> {
261    fn clone(&self) -> Self {
262        *self
263    }
264}
265
266impl<T, S> Debug for AsyncDerived<T, S>
267where
268    S: Debug,
269{
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct("AsyncDerived")
272            .field("type", &std::any::type_name::<T>())
273            .field("store", &self.inner)
274            .finish()
275    }
276}
277
278impl<T, S> DefinedAt for AsyncDerived<T, S> {
279    #[inline(always)]
280    fn defined_at(&self) -> Option<&'static Location<'static>> {
281        #[cfg(any(debug_assertions, leptos_debuginfo))]
282        {
283            Some(self.defined_at)
284        }
285        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
286        {
287            None
288        }
289    }
290}
291
292impl<T, S> ReadUntracked for AsyncDerived<T, S>
293where
294    T: 'static,
295    S: Storage<ArcAsyncDerived<T>>,
296{
297    type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
298
299    fn try_read_untracked(&self) -> Option<Self::Value> {
300        self.inner
301            .try_get_value()
302            .map(|inner| inner.read_untracked())
303    }
304}
305
306impl<T, S> Notify for AsyncDerived<T, S>
307where
308    T: 'static,
309    S: Storage<ArcAsyncDerived<T>>,
310{
311    fn notify(&self) {
312        self.inner.try_with_value(|inner| inner.notify());
313    }
314}
315
316impl<T, S> Write for AsyncDerived<T, S>
317where
318    T: 'static,
319    S: Storage<ArcAsyncDerived<T>>,
320{
321    type Value = Option<T>;
322
323    fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
324        let guard = self
325            .inner
326            .try_with_value(|n| n.value.blocking_write_arc())?;
327        Some(WriteGuard::new(*self, guard))
328    }
329
330    fn try_write_untracked(
331        &self,
332    ) -> Option<impl DerefMut<Target = Self::Value>> {
333        self.inner.try_with_value(|n| n.value.blocking_write_arc())
334    }
335}
336
337impl<T, S> IsDisposed for AsyncDerived<T, S>
338where
339    T: 'static,
340    S: Storage<ArcAsyncDerived<T>>,
341{
342    fn is_disposed(&self) -> bool {
343        self.inner.is_disposed()
344    }
345}
346
347impl<T, S> ToAnySource for AsyncDerived<T, S>
348where
349    T: 'static,
350    S: Storage<ArcAsyncDerived<T>>,
351{
352    fn to_any_source(&self) -> AnySource {
353        self.inner
354            .try_get_value()
355            .map(|inner| inner.to_any_source())
356            .unwrap_or_else(unwrap_signal!(self))
357    }
358}
359
360impl<T, S> ToAnySubscriber for AsyncDerived<T, S>
361where
362    T: 'static,
363    S: Storage<ArcAsyncDerived<T>>,
364{
365    fn to_any_subscriber(&self) -> AnySubscriber {
366        self.inner
367            .try_get_value()
368            .map(|inner| inner.to_any_subscriber())
369            .unwrap_or_else(unwrap_signal!(self))
370    }
371}
372
373impl<T, S> Source for AsyncDerived<T, S>
374where
375    T: 'static,
376    S: Storage<ArcAsyncDerived<T>>,
377{
378    fn add_subscriber(&self, subscriber: AnySubscriber) {
379        if let Some(inner) = self.inner.try_get_value() {
380            inner.add_subscriber(subscriber);
381        }
382    }
383
384    fn remove_subscriber(&self, subscriber: &AnySubscriber) {
385        if let Some(inner) = self.inner.try_get_value() {
386            inner.remove_subscriber(subscriber);
387        }
388    }
389
390    fn clear_subscribers(&self) {
391        if let Some(inner) = self.inner.try_get_value() {
392            inner.clear_subscribers();
393        }
394    }
395}
396
397impl<T, S> ReactiveNode for AsyncDerived<T, S>
398where
399    T: 'static,
400    S: Storage<ArcAsyncDerived<T>>,
401{
402    fn mark_dirty(&self) {
403        if let Some(inner) = self.inner.try_get_value() {
404            inner.mark_dirty();
405        }
406    }
407
408    fn mark_check(&self) {
409        if let Some(inner) = self.inner.try_get_value() {
410            inner.mark_check();
411        }
412    }
413
414    fn mark_subscribers_check(&self) {
415        if let Some(inner) = self.inner.try_get_value() {
416            inner.mark_subscribers_check();
417        }
418    }
419
420    fn update_if_necessary(&self) -> bool {
421        if let Some(inner) = self.inner.try_get_value() {
422            inner.update_if_necessary()
423        } else {
424            false
425        }
426    }
427}
428
429impl<T, S> Subscriber for AsyncDerived<T, S>
430where
431    T: 'static,
432    S: Storage<ArcAsyncDerived<T>>,
433{
434    fn add_source(&self, source: AnySource) {
435        if let Some(inner) = self.inner.try_get_value() {
436            inner.add_source(source);
437        }
438    }
439
440    fn clear_sources(&self, subscriber: &AnySubscriber) {
441        if let Some(inner) = self.inner.try_get_value() {
442            inner.clear_sources(subscriber);
443        }
444    }
445}