reactive_graph/computed/async_derived/
future_impls.rs

1use super::{inner::ArcAsyncDerivedInner, ArcAsyncDerived, AsyncDerived};
2use crate::{
3    computed::suspense::SuspenseContext,
4    diagnostics::SpecialNonReactiveZone,
5    graph::{AnySource, ToAnySource},
6    owner::{use_context, Storage},
7    send_wrapper_ext::SendOption,
8    signal::guards::{AsyncPlain, Mapped, ReadGuard},
9    traits::{DefinedAt, Track},
10    unwrap_signal,
11};
12use futures::pin_mut;
13use or_poisoned::OrPoisoned;
14use std::{
15    future::{Future, IntoFuture},
16    pin::Pin,
17    sync::{
18        atomic::{AtomicBool, Ordering},
19        Arc, RwLock,
20    },
21    task::{Context, Poll, Waker},
22};
23
24/// A read guard that holds access to an async derived resource.
25///
26/// Implements [`Deref`](std::ops::Deref) to access the inner value. This should not be held longer
27/// than it is needed, as it prevents updates to the inner value.
28pub type AsyncDerivedGuard<T> =
29    ReadGuard<T, Mapped<AsyncPlain<SendOption<T>>, T>>;
30
31/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
32/// but does not contain its value.
33pub struct AsyncDerivedReadyFuture {
34    pub(crate) source: AnySource,
35    pub(crate) loading: Arc<AtomicBool>,
36    pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
37}
38
39impl AsyncDerivedReadyFuture {
40    /// Creates a new [`Future`] that will be ready when the given resource is ready.
41    pub fn new(
42        source: AnySource,
43        loading: &Arc<AtomicBool>,
44        wakers: &Arc<RwLock<Vec<Waker>>>,
45    ) -> Self {
46        AsyncDerivedReadyFuture {
47            source,
48            loading: Arc::clone(loading),
49            wakers: Arc::clone(wakers),
50        }
51    }
52}
53
54impl Future for AsyncDerivedReadyFuture {
55    type Output = ();
56
57    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
58        #[cfg(debug_assertions)]
59        let _guard = SpecialNonReactiveZone::enter();
60        let waker = cx.waker();
61        self.source.track();
62        if self.loading.load(Ordering::Relaxed) {
63            self.wakers.write().or_poisoned().push(waker.clone());
64            Poll::Pending
65        } else {
66            Poll::Ready(())
67        }
68    }
69}
70
71impl<T> IntoFuture for ArcAsyncDerived<T>
72where
73    T: Clone + 'static,
74{
75    type Output = T;
76    type IntoFuture = AsyncDerivedFuture<T>;
77
78    fn into_future(self) -> Self::IntoFuture {
79        AsyncDerivedFuture {
80            source: self.to_any_source(),
81            value: Arc::clone(&self.value),
82            loading: Arc::clone(&self.loading),
83            wakers: Arc::clone(&self.wakers),
84            inner: Arc::clone(&self.inner),
85        }
86    }
87}
88
89impl<T, S> IntoFuture for AsyncDerived<T, S>
90where
91    T: Clone + 'static,
92    S: Storage<ArcAsyncDerived<T>>,
93{
94    type Output = T;
95    type IntoFuture = AsyncDerivedFuture<T>;
96
97    #[track_caller]
98    fn into_future(self) -> Self::IntoFuture {
99        let this = self
100            .inner
101            .try_get_value()
102            .unwrap_or_else(unwrap_signal!(self));
103        this.into_future()
104    }
105}
106
107/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
108/// and contains its value. `.await`ing this clones the value `T`.
109pub struct AsyncDerivedFuture<T> {
110    source: AnySource,
111    value: Arc<async_lock::RwLock<SendOption<T>>>,
112    loading: Arc<AtomicBool>,
113    wakers: Arc<RwLock<Vec<Waker>>>,
114    inner: Arc<RwLock<ArcAsyncDerivedInner>>,
115}
116
117impl<T> Future for AsyncDerivedFuture<T>
118where
119    T: Clone + 'static,
120{
121    type Output = T;
122
123    #[track_caller]
124    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
125        #[cfg(debug_assertions)]
126        let _guard = SpecialNonReactiveZone::enter();
127        let waker = cx.waker();
128        self.source.track();
129        let value = self.value.read_arc();
130
131        if let Some(suspense_context) = use_context::<SuspenseContext>() {
132            self.inner
133                .write()
134                .or_poisoned()
135                .suspenses
136                .push(suspense_context);
137        }
138
139        pin_mut!(value);
140        match (self.loading.load(Ordering::Relaxed), value.poll(cx)) {
141            (true, _) => {
142                self.wakers.write().or_poisoned().push(waker.clone());
143                Poll::Pending
144            }
145            (_, Poll::Pending) => Poll::Pending,
146            (_, Poll::Ready(guard)) => {
147                Poll::Ready(guard.as_ref().unwrap().clone())
148            }
149        }
150    }
151}
152
153impl<T: 'static> ArcAsyncDerived<T> {
154    /// Returns a `Future` that resolves when the computation is finished, and accesses the inner
155    /// value by reference rather than by cloning it.
156    #[track_caller]
157    pub fn by_ref(&self) -> AsyncDerivedRefFuture<T> {
158        AsyncDerivedRefFuture {
159            source: self.to_any_source(),
160            value: Arc::clone(&self.value),
161            loading: Arc::clone(&self.loading),
162            wakers: Arc::clone(&self.wakers),
163        }
164    }
165}
166
167impl<T, S> AsyncDerived<T, S>
168where
169    T: 'static,
170    S: Storage<ArcAsyncDerived<T>>,
171{
172    /// Returns a `Future` that resolves when the computation is finished, and accesses the inner
173    /// value by reference rather than by cloning it.
174    #[track_caller]
175    pub fn by_ref(&self) -> AsyncDerivedRefFuture<T> {
176        let this = self
177            .inner
178            .try_get_value()
179            .unwrap_or_else(unwrap_signal!(self));
180        this.by_ref()
181    }
182}
183
184/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
185/// and yields an [`AsyncDerivedGuard`] that dereferences to its value.
186pub struct AsyncDerivedRefFuture<T> {
187    source: AnySource,
188    value: Arc<async_lock::RwLock<SendOption<T>>>,
189    loading: Arc<AtomicBool>,
190    wakers: Arc<RwLock<Vec<Waker>>>,
191}
192
193impl<T> Future for AsyncDerivedRefFuture<T>
194where
195    T: 'static,
196{
197    type Output = AsyncDerivedGuard<T>;
198
199    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200        #[cfg(debug_assertions)]
201        let _guard = SpecialNonReactiveZone::enter();
202        let waker = cx.waker();
203        self.source.track();
204        let value = self.value.read_arc();
205        pin_mut!(value);
206        match (self.loading.load(Ordering::Relaxed), value.poll(cx)) {
207            (true, _) => {
208                self.wakers.write().or_poisoned().push(waker.clone());
209                Poll::Pending
210            }
211            (_, Poll::Pending) => Poll::Pending,
212            (_, Poll::Ready(guard)) => Poll::Ready(ReadGuard::new(
213                Mapped::new_with_guard(AsyncPlain { guard }, |guard| {
214                    guard.as_ref().unwrap()
215                }),
216            )),
217        }
218    }
219}