reactive_graph/computed/async_derived/
future_impls.rs1use super::{inner::ArcAsyncDerivedInner, ArcAsyncDerived, AsyncDerived};
2use crate::{
3 computed::suspense::SuspenseContext,
4 diagnostics::SpecialNonReactiveZone,
5 graph::{AnySource, ToAnySource},
6 owner::{use_context, Storage},
7 send_wrapper_ext::SendOption,
8 signal::guards::{AsyncPlain, Mapped, ReadGuard},
9 traits::{DefinedAt, Track},
10 unwrap_signal,
11};
12use futures::pin_mut;
13use or_poisoned::OrPoisoned;
14use std::{
15 future::{Future, IntoFuture},
16 pin::Pin,
17 sync::{
18 atomic::{AtomicBool, Ordering},
19 Arc, RwLock,
20 },
21 task::{Context, Poll, Waker},
22};
23
24pub type AsyncDerivedGuard<T> =
29 ReadGuard<T, Mapped<AsyncPlain<SendOption<T>>, T>>;
30
31pub struct AsyncDerivedReadyFuture {
34 pub(crate) source: AnySource,
35 pub(crate) loading: Arc<AtomicBool>,
36 pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
37}
38
39impl AsyncDerivedReadyFuture {
40 pub fn new(
42 source: AnySource,
43 loading: &Arc<AtomicBool>,
44 wakers: &Arc<RwLock<Vec<Waker>>>,
45 ) -> Self {
46 AsyncDerivedReadyFuture {
47 source,
48 loading: Arc::clone(loading),
49 wakers: Arc::clone(wakers),
50 }
51 }
52}
53
54impl Future for AsyncDerivedReadyFuture {
55 type Output = ();
56
57 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
58 #[cfg(debug_assertions)]
59 let _guard = SpecialNonReactiveZone::enter();
60 let waker = cx.waker();
61 self.source.track();
62 if self.loading.load(Ordering::Relaxed) {
63 self.wakers.write().or_poisoned().push(waker.clone());
64 Poll::Pending
65 } else {
66 Poll::Ready(())
67 }
68 }
69}
70
71impl<T> IntoFuture for ArcAsyncDerived<T>
72where
73 T: Clone + 'static,
74{
75 type Output = T;
76 type IntoFuture = AsyncDerivedFuture<T>;
77
78 fn into_future(self) -> Self::IntoFuture {
79 AsyncDerivedFuture {
80 source: self.to_any_source(),
81 value: Arc::clone(&self.value),
82 loading: Arc::clone(&self.loading),
83 wakers: Arc::clone(&self.wakers),
84 inner: Arc::clone(&self.inner),
85 }
86 }
87}
88
89impl<T, S> IntoFuture for AsyncDerived<T, S>
90where
91 T: Clone + 'static,
92 S: Storage<ArcAsyncDerived<T>>,
93{
94 type Output = T;
95 type IntoFuture = AsyncDerivedFuture<T>;
96
97 #[track_caller]
98 fn into_future(self) -> Self::IntoFuture {
99 let this = self
100 .inner
101 .try_get_value()
102 .unwrap_or_else(unwrap_signal!(self));
103 this.into_future()
104 }
105}
106
107pub struct AsyncDerivedFuture<T> {
110 source: AnySource,
111 value: Arc<async_lock::RwLock<SendOption<T>>>,
112 loading: Arc<AtomicBool>,
113 wakers: Arc<RwLock<Vec<Waker>>>,
114 inner: Arc<RwLock<ArcAsyncDerivedInner>>,
115}
116
117impl<T> Future for AsyncDerivedFuture<T>
118where
119 T: Clone + 'static,
120{
121 type Output = T;
122
123 #[track_caller]
124 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
125 #[cfg(debug_assertions)]
126 let _guard = SpecialNonReactiveZone::enter();
127 let waker = cx.waker();
128 self.source.track();
129 let value = self.value.read_arc();
130
131 if let Some(suspense_context) = use_context::<SuspenseContext>() {
132 self.inner
133 .write()
134 .or_poisoned()
135 .suspenses
136 .push(suspense_context);
137 }
138
139 pin_mut!(value);
140 match (self.loading.load(Ordering::Relaxed), value.poll(cx)) {
141 (true, _) => {
142 self.wakers.write().or_poisoned().push(waker.clone());
143 Poll::Pending
144 }
145 (_, Poll::Pending) => Poll::Pending,
146 (_, Poll::Ready(guard)) => {
147 Poll::Ready(guard.as_ref().unwrap().clone())
148 }
149 }
150 }
151}
152
153impl<T: 'static> ArcAsyncDerived<T> {
154 #[track_caller]
157 pub fn by_ref(&self) -> AsyncDerivedRefFuture<T> {
158 AsyncDerivedRefFuture {
159 source: self.to_any_source(),
160 value: Arc::clone(&self.value),
161 loading: Arc::clone(&self.loading),
162 wakers: Arc::clone(&self.wakers),
163 }
164 }
165}
166
167impl<T, S> AsyncDerived<T, S>
168where
169 T: 'static,
170 S: Storage<ArcAsyncDerived<T>>,
171{
172 #[track_caller]
175 pub fn by_ref(&self) -> AsyncDerivedRefFuture<T> {
176 let this = self
177 .inner
178 .try_get_value()
179 .unwrap_or_else(unwrap_signal!(self));
180 this.by_ref()
181 }
182}
183
184pub struct AsyncDerivedRefFuture<T> {
187 source: AnySource,
188 value: Arc<async_lock::RwLock<SendOption<T>>>,
189 loading: Arc<AtomicBool>,
190 wakers: Arc<RwLock<Vec<Waker>>>,
191}
192
193impl<T> Future for AsyncDerivedRefFuture<T>
194where
195 T: 'static,
196{
197 type Output = AsyncDerivedGuard<T>;
198
199 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200 #[cfg(debug_assertions)]
201 let _guard = SpecialNonReactiveZone::enter();
202 let waker = cx.waker();
203 self.source.track();
204 let value = self.value.read_arc();
205 pin_mut!(value);
206 match (self.loading.load(Ordering::Relaxed), value.poll(cx)) {
207 (true, _) => {
208 self.wakers.write().or_poisoned().push(waker.clone());
209 Poll::Pending
210 }
211 (_, Poll::Pending) => Poll::Pending,
212 (_, Poll::Ready(guard)) => Poll::Ready(ReadGuard::new(
213 Mapped::new_with_guard(AsyncPlain { guard }, |guard| {
214 guard.as_ref().unwrap()
215 }),
216 )),
217 }
218 }
219}